summary refs log tree commit diff
path: root/synapse/storage/stats.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2019-10-21 12:56:42 +0100
committerErik Johnston <erik@matrix.org>2019-10-21 16:05:06 +0100
commitc66a06ac6b69b0a03f5c6284ded980399e9df94e (patch)
tree01dfd3b9098a9ace759403744d122c18efbd97ff /synapse/storage/stats.py
parentMerge branch 'master' into develop (diff)
downloadsynapse-c66a06ac6b69b0a03f5c6284ded980399e9df94e.tar.xz
Move storage classes into a main "data store".
This is in preparation for having multiple data stores that offer
different functionality, e.g. splitting out state or event storage.
Diffstat (limited to 'synapse/storage/stats.py')
-rw-r--r--synapse/storage/stats.py881
1 files changed, 0 insertions, 881 deletions
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
deleted file mode 100644
index 7c224cd3d9..0000000000
--- a/synapse/storage/stats.py
+++ /dev/null
@@ -1,881 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2018, 2019 New Vector Ltd
-# Copyright 2019 The Matrix.org Foundation C.I.C.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import logging
-from itertools import chain
-
-from twisted.internet import defer
-from twisted.internet.defer import DeferredLock
-
-from synapse.api.constants import EventTypes, Membership
-from synapse.storage import PostgresEngine
-from synapse.storage.state_deltas import StateDeltasStore
-from synapse.util.caches.descriptors import cached
-
-logger = logging.getLogger(__name__)
-
-# these fields track absolutes (e.g. total number of rooms on the server)
-# You can think of these as Prometheus Gauges.
-# You can draw these stats on a line graph.
-# Example: number of users in a room
-ABSOLUTE_STATS_FIELDS = {
-    "room": (
-        "current_state_events",
-        "joined_members",
-        "invited_members",
-        "left_members",
-        "banned_members",
-        "local_users_in_room",
-    ),
-    "user": ("joined_rooms",),
-}
-
-# these fields are per-timeslice and so should be reset to 0 upon a new slice
-# You can draw these stats on a histogram.
-# Example: number of events sent locally during a time slice
-PER_SLICE_FIELDS = {
-    "room": ("total_events", "total_event_bytes"),
-    "user": ("invites_sent", "rooms_created", "total_events", "total_event_bytes"),
-}
-
-TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
-
-# these are the tables (& ID columns) which contain our actual subjects
-TYPE_TO_ORIGIN_TABLE = {"room": ("rooms", "room_id"), "user": ("users", "name")}
-
-
-class StatsStore(StateDeltasStore):
-    def __init__(self, db_conn, hs):
-        super(StatsStore, self).__init__(db_conn, hs)
-
-        self.server_name = hs.hostname
-        self.clock = self.hs.get_clock()
-        self.stats_enabled = hs.config.stats_enabled
-        self.stats_bucket_size = hs.config.stats_bucket_size
-
-        self.stats_delta_processing_lock = DeferredLock()
-
-        self.register_background_update_handler(
-            "populate_stats_process_rooms", self._populate_stats_process_rooms
-        )
-        self.register_background_update_handler(
-            "populate_stats_process_users", self._populate_stats_process_users
-        )
-        # we no longer need to perform clean-up, but we will give ourselves
-        # the potential to reintroduce it in the future – so documentation
-        # will still encourage the use of this no-op handler.
-        self.register_noop_background_update("populate_stats_cleanup")
-        self.register_noop_background_update("populate_stats_prepare")
-
-    def quantise_stats_time(self, ts):
-        """
-        Quantises a timestamp to be a multiple of the bucket size.
-
-        Args:
-            ts (int): the timestamp to quantise, in milliseconds since the Unix
-                Epoch
-
-        Returns:
-            int: a timestamp which
-              - is divisible by the bucket size;
-              - is no later than `ts`; and
-              - is the largest such timestamp.
-        """
-        return (ts // self.stats_bucket_size) * self.stats_bucket_size
-
-    @defer.inlineCallbacks
-    def _populate_stats_process_users(self, progress, batch_size):
-        """
-        This is a background update which regenerates statistics for users.
-        """
-        if not self.stats_enabled:
-            yield self._end_background_update("populate_stats_process_users")
-            return 1
-
-        last_user_id = progress.get("last_user_id", "")
-
-        def _get_next_batch(txn):
-            sql = """
-                    SELECT DISTINCT name FROM users
-                    WHERE name > ?
-                    ORDER BY name ASC
-                    LIMIT ?
-                """
-            txn.execute(sql, (last_user_id, batch_size))
-            return [r for r, in txn]
-
-        users_to_work_on = yield self.runInteraction(
-            "_populate_stats_process_users", _get_next_batch
-        )
-
-        # No more rooms -- complete the transaction.
-        if not users_to_work_on:
-            yield self._end_background_update("populate_stats_process_users")
-            return 1
-
-        for user_id in users_to_work_on:
-            yield self._calculate_and_set_initial_state_for_user(user_id)
-            progress["last_user_id"] = user_id
-
-        yield self.runInteraction(
-            "populate_stats_process_users",
-            self._background_update_progress_txn,
-            "populate_stats_process_users",
-            progress,
-        )
-
-        return len(users_to_work_on)
-
-    @defer.inlineCallbacks
-    def _populate_stats_process_rooms(self, progress, batch_size):
-        """
-        This is a background update which regenerates statistics for rooms.
-        """
-        if not self.stats_enabled:
-            yield self._end_background_update("populate_stats_process_rooms")
-            return 1
-
-        last_room_id = progress.get("last_room_id", "")
-
-        def _get_next_batch(txn):
-            sql = """
-                    SELECT DISTINCT room_id FROM current_state_events
-                    WHERE room_id > ?
-                    ORDER BY room_id ASC
-                    LIMIT ?
-                """
-            txn.execute(sql, (last_room_id, batch_size))
-            return [r for r, in txn]
-
-        rooms_to_work_on = yield self.runInteraction(
-            "populate_stats_rooms_get_batch", _get_next_batch
-        )
-
-        # No more rooms -- complete the transaction.
-        if not rooms_to_work_on:
-            yield self._end_background_update("populate_stats_process_rooms")
-            return 1
-
-        for room_id in rooms_to_work_on:
-            yield self._calculate_and_set_initial_state_for_room(room_id)
-            progress["last_room_id"] = room_id
-
-        yield self.runInteraction(
-            "_populate_stats_process_rooms",
-            self._background_update_progress_txn,
-            "populate_stats_process_rooms",
-            progress,
-        )
-
-        return len(rooms_to_work_on)
-
-    def get_stats_positions(self):
-        """
-        Returns the stats processor positions.
-        """
-        return self._simple_select_one_onecol(
-            table="stats_incremental_position",
-            keyvalues={},
-            retcol="stream_id",
-            desc="stats_incremental_position",
-        )
-
-    def update_room_state(self, room_id, fields):
-        """
-        Args:
-            room_id (str)
-            fields (dict[str:Any])
-        """
-
-        # For whatever reason some of the fields may contain null bytes, which
-        # postgres isn't a fan of, so we replace those fields with null.
-        for col in (
-            "join_rules",
-            "history_visibility",
-            "encryption",
-            "name",
-            "topic",
-            "avatar",
-            "canonical_alias",
-        ):
-            field = fields.get(col)
-            if field and "\0" in field:
-                fields[col] = None
-
-        return self._simple_upsert(
-            table="room_stats_state",
-            keyvalues={"room_id": room_id},
-            values=fields,
-            desc="update_room_state",
-        )
-
-    def get_statistics_for_subject(self, stats_type, stats_id, start, size=100):
-        """
-        Get statistics for a given subject.
-
-        Args:
-            stats_type (str): The type of subject
-            stats_id (str): The ID of the subject (e.g. room_id or user_id)
-            start (int): Pagination start. Number of entries, not timestamp.
-            size (int): How many entries to return.
-
-        Returns:
-            Deferred[list[dict]], where the dict has the keys of
-            ABSOLUTE_STATS_FIELDS[stats_type],  and "bucket_size" and "end_ts".
-        """
-        return self.runInteraction(
-            "get_statistics_for_subject",
-            self._get_statistics_for_subject_txn,
-            stats_type,
-            stats_id,
-            start,
-            size,
-        )
-
-    def _get_statistics_for_subject_txn(
-        self, txn, stats_type, stats_id, start, size=100
-    ):
-        """
-        Transaction-bound version of L{get_statistics_for_subject}.
-        """
-
-        table, id_col = TYPE_TO_TABLE[stats_type]
-        selected_columns = list(
-            ABSOLUTE_STATS_FIELDS[stats_type] + PER_SLICE_FIELDS[stats_type]
-        )
-
-        slice_list = self._simple_select_list_paginate_txn(
-            txn,
-            table + "_historical",
-            {id_col: stats_id},
-            "end_ts",
-            start,
-            size,
-            retcols=selected_columns + ["bucket_size", "end_ts"],
-            order_direction="DESC",
-        )
-
-        return slice_list
-
-    def get_room_stats_state(self, room_id):
-        """
-        Returns the current room_stats_state for a room.
-
-        Args:
-            room_id (str): The ID of the room to return state for.
-
-        Returns (dict):
-            Dictionary containing these keys:
-                "name", "topic", "canonical_alias", "avatar", "join_rules",
-                "history_visibility"
-        """
-        return self._simple_select_one(
-            "room_stats_state",
-            {"room_id": room_id},
-            retcols=(
-                "name",
-                "topic",
-                "canonical_alias",
-                "avatar",
-                "join_rules",
-                "history_visibility",
-            ),
-        )
-
-    @cached()
-    def get_earliest_token_for_stats(self, stats_type, id):
-        """
-        Fetch the "earliest token". This is used by the room stats delta
-        processor to ignore deltas that have been processed between the
-        start of the background task and any particular room's stats
-        being calculated.
-
-        Returns:
-            Deferred[int]
-        """
-        table, id_col = TYPE_TO_TABLE[stats_type]
-
-        return self._simple_select_one_onecol(
-            "%s_current" % (table,),
-            keyvalues={id_col: id},
-            retcol="completed_delta_stream_id",
-            allow_none=True,
-        )
-
-    def bulk_update_stats_delta(self, ts, updates, stream_id):
-        """Bulk update stats tables for a given stream_id and updates the stats
-        incremental position.
-
-        Args:
-            ts (int): Current timestamp in ms
-            updates(dict[str, dict[str, dict[str, Counter]]]): The updates to
-                commit as a mapping stats_type -> stats_id -> field -> delta.
-            stream_id (int): Current position.
-
-        Returns:
-            Deferred
-        """
-
-        def _bulk_update_stats_delta_txn(txn):
-            for stats_type, stats_updates in updates.items():
-                for stats_id, fields in stats_updates.items():
-                    logger.info(
-                        "Updating %s stats for %s: %s", stats_type, stats_id, fields
-                    )
-                    self._update_stats_delta_txn(
-                        txn,
-                        ts=ts,
-                        stats_type=stats_type,
-                        stats_id=stats_id,
-                        fields=fields,
-                        complete_with_stream_id=stream_id,
-                    )
-
-            self._simple_update_one_txn(
-                txn,
-                table="stats_incremental_position",
-                keyvalues={},
-                updatevalues={"stream_id": stream_id},
-            )
-
-        return self.runInteraction(
-            "bulk_update_stats_delta", _bulk_update_stats_delta_txn
-        )
-
-    def update_stats_delta(
-        self,
-        ts,
-        stats_type,
-        stats_id,
-        fields,
-        complete_with_stream_id,
-        absolute_field_overrides=None,
-    ):
-        """
-        Updates the statistics for a subject, with a delta (difference/relative
-        change).
-
-        Args:
-            ts (int): timestamp of the change
-            stats_type (str): "room" or "user" – the kind of subject
-            stats_id (str): the subject's ID (room ID or user ID)
-            fields (dict[str, int]): Deltas of stats values.
-            complete_with_stream_id (int, optional):
-                If supplied, converts an incomplete row into a complete row,
-                with the supplied stream_id marked as the stream_id where the
-                row was completed.
-            absolute_field_overrides (dict[str, int]): Current stats values
-                (i.e. not deltas) of absolute fields.
-                Does not work with per-slice fields.
-        """
-
-        return self.runInteraction(
-            "update_stats_delta",
-            self._update_stats_delta_txn,
-            ts,
-            stats_type,
-            stats_id,
-            fields,
-            complete_with_stream_id=complete_with_stream_id,
-            absolute_field_overrides=absolute_field_overrides,
-        )
-
-    def _update_stats_delta_txn(
-        self,
-        txn,
-        ts,
-        stats_type,
-        stats_id,
-        fields,
-        complete_with_stream_id,
-        absolute_field_overrides=None,
-    ):
-        if absolute_field_overrides is None:
-            absolute_field_overrides = {}
-
-        table, id_col = TYPE_TO_TABLE[stats_type]
-
-        quantised_ts = self.quantise_stats_time(int(ts))
-        end_ts = quantised_ts + self.stats_bucket_size
-
-        # Lets be paranoid and check that all the given field names are known
-        abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type]
-        slice_field_names = PER_SLICE_FIELDS[stats_type]
-        for field in chain(fields.keys(), absolute_field_overrides.keys()):
-            if field not in abs_field_names and field not in slice_field_names:
-                # guard against potential SQL injection dodginess
-                raise ValueError(
-                    "%s is not a recognised field"
-                    " for stats type %s" % (field, stats_type)
-                )
-
-        # Per slice fields do not get added to the _current table
-
-        # This calculates the deltas (`field = field + ?` values)
-        # for absolute fields,
-        # * defaulting to 0 if not specified
-        #     (required for the INSERT part of upserting to work)
-        # * omitting overrides specified in `absolute_field_overrides`
-        deltas_of_absolute_fields = {
-            key: fields.get(key, 0)
-            for key in abs_field_names
-            if key not in absolute_field_overrides
-        }
-
-        # Keep the delta stream ID field up to date
-        absolute_field_overrides = absolute_field_overrides.copy()
-        absolute_field_overrides["completed_delta_stream_id"] = complete_with_stream_id
-
-        # first upsert the `_current` table
-        self._upsert_with_additive_relatives_txn(
-            txn=txn,
-            table=table + "_current",
-            keyvalues={id_col: stats_id},
-            absolutes=absolute_field_overrides,
-            additive_relatives=deltas_of_absolute_fields,
-        )
-
-        per_slice_additive_relatives = {
-            key: fields.get(key, 0) for key in slice_field_names
-        }
-        self._upsert_copy_from_table_with_additive_relatives_txn(
-            txn=txn,
-            into_table=table + "_historical",
-            keyvalues={id_col: stats_id},
-            extra_dst_insvalues={"bucket_size": self.stats_bucket_size},
-            extra_dst_keyvalues={"end_ts": end_ts},
-            additive_relatives=per_slice_additive_relatives,
-            src_table=table + "_current",
-            copy_columns=abs_field_names,
-        )
-
-    def _upsert_with_additive_relatives_txn(
-        self, txn, table, keyvalues, absolutes, additive_relatives
-    ):
-        """Used to update values in the stats tables.
-
-        This is basically a slightly convoluted upsert that *adds* to any
-        existing rows.
-
-        Args:
-            txn
-            table (str): Table name
-            keyvalues (dict[str, any]): Row-identifying key values
-            absolutes (dict[str, any]): Absolute (set) fields
-            additive_relatives (dict[str, int]): Fields that will be added onto
-                if existing row present.
-        """
-        if self.database_engine.can_native_upsert:
-            absolute_updates = [
-                "%(field)s = EXCLUDED.%(field)s" % {"field": field}
-                for field in absolutes.keys()
-            ]
-
-            relative_updates = [
-                "%(field)s = EXCLUDED.%(field)s + %(table)s.%(field)s"
-                % {"table": table, "field": field}
-                for field in additive_relatives.keys()
-            ]
-
-            insert_cols = []
-            qargs = []
-
-            for (key, val) in chain(
-                keyvalues.items(), absolutes.items(), additive_relatives.items()
-            ):
-                insert_cols.append(key)
-                qargs.append(val)
-
-            sql = """
-                INSERT INTO %(table)s (%(insert_cols_cs)s)
-                VALUES (%(insert_vals_qs)s)
-                ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s
-            """ % {
-                "table": table,
-                "insert_cols_cs": ", ".join(insert_cols),
-                "insert_vals_qs": ", ".join(
-                    ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
-                ),
-                "key_columns": ", ".join(keyvalues),
-                "updates": ", ".join(chain(absolute_updates, relative_updates)),
-            }
-
-            txn.execute(sql, qargs)
-        else:
-            self.database_engine.lock_table(txn, table)
-            retcols = list(chain(absolutes.keys(), additive_relatives.keys()))
-            current_row = self._simple_select_one_txn(
-                txn, table, keyvalues, retcols, allow_none=True
-            )
-            if current_row is None:
-                merged_dict = {**keyvalues, **absolutes, **additive_relatives}
-                self._simple_insert_txn(txn, table, merged_dict)
-            else:
-                for (key, val) in additive_relatives.items():
-                    current_row[key] += val
-                current_row.update(absolutes)
-                self._simple_update_one_txn(txn, table, keyvalues, current_row)
-
-    def _upsert_copy_from_table_with_additive_relatives_txn(
-        self,
-        txn,
-        into_table,
-        keyvalues,
-        extra_dst_keyvalues,
-        extra_dst_insvalues,
-        additive_relatives,
-        src_table,
-        copy_columns,
-    ):
-        """Updates the historic stats table with latest updates.
-
-        This involves copying "absolute" fields from the `_current` table, and
-        adding relative fields to any existing values.
-
-        Args:
-             txn: Transaction
-             into_table (str): The destination table to UPSERT the row into
-             keyvalues (dict[str, any]): Row-identifying key values
-             extra_dst_keyvalues (dict[str, any]): Additional keyvalues
-                for `into_table`.
-             extra_dst_insvalues (dict[str, any]): Additional values to insert
-                on new row creation for `into_table`.
-             additive_relatives (dict[str, any]): Fields that will be added onto
-                if existing row present. (Must be disjoint from copy_columns.)
-             src_table (str): The source table to copy from
-             copy_columns (iterable[str]): The list of columns to copy
-        """
-        if self.database_engine.can_native_upsert:
-            ins_columns = chain(
-                keyvalues,
-                copy_columns,
-                additive_relatives,
-                extra_dst_keyvalues,
-                extra_dst_insvalues,
-            )
-            sel_exprs = chain(
-                keyvalues,
-                copy_columns,
-                (
-                    "?"
-                    for _ in chain(
-                        additive_relatives, extra_dst_keyvalues, extra_dst_insvalues
-                    )
-                ),
-            )
-            keyvalues_where = ("%s = ?" % f for f in keyvalues)
-
-            sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns)
-            sets_ar = (
-                "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f)
-                for f in additive_relatives
-            )
-
-            sql = """
-                INSERT INTO %(into_table)s (%(ins_columns)s)
-                SELECT %(sel_exprs)s
-                FROM %(src_table)s
-                WHERE %(keyvalues_where)s
-                ON CONFLICT (%(keyvalues)s)
-                DO UPDATE SET %(sets)s
-            """ % {
-                "into_table": into_table,
-                "ins_columns": ", ".join(ins_columns),
-                "sel_exprs": ", ".join(sel_exprs),
-                "keyvalues_where": " AND ".join(keyvalues_where),
-                "src_table": src_table,
-                "keyvalues": ", ".join(
-                    chain(keyvalues.keys(), extra_dst_keyvalues.keys())
-                ),
-                "sets": ", ".join(chain(sets_cc, sets_ar)),
-            }
-
-            qargs = list(
-                chain(
-                    additive_relatives.values(),
-                    extra_dst_keyvalues.values(),
-                    extra_dst_insvalues.values(),
-                    keyvalues.values(),
-                )
-            )
-            txn.execute(sql, qargs)
-        else:
-            self.database_engine.lock_table(txn, into_table)
-            src_row = self._simple_select_one_txn(
-                txn, src_table, keyvalues, copy_columns
-            )
-            all_dest_keyvalues = {**keyvalues, **extra_dst_keyvalues}
-            dest_current_row = self._simple_select_one_txn(
-                txn,
-                into_table,
-                keyvalues=all_dest_keyvalues,
-                retcols=list(chain(additive_relatives.keys(), copy_columns)),
-                allow_none=True,
-            )
-
-            if dest_current_row is None:
-                merged_dict = {
-                    **keyvalues,
-                    **extra_dst_keyvalues,
-                    **extra_dst_insvalues,
-                    **src_row,
-                    **additive_relatives,
-                }
-                self._simple_insert_txn(txn, into_table, merged_dict)
-            else:
-                for (key, val) in additive_relatives.items():
-                    src_row[key] = dest_current_row[key] + val
-                self._simple_update_txn(txn, into_table, all_dest_keyvalues, src_row)
-
-    def get_changes_room_total_events_and_bytes(self, min_pos, max_pos):
-        """Fetches the counts of events in the given range of stream IDs.
-
-        Args:
-            min_pos (int)
-            max_pos (int)
-
-        Returns:
-            Deferred[dict[str, dict[str, int]]]: Mapping of room ID to field
-            changes.
-        """
-
-        return self.runInteraction(
-            "stats_incremental_total_events_and_bytes",
-            self.get_changes_room_total_events_and_bytes_txn,
-            min_pos,
-            max_pos,
-        )
-
-    def get_changes_room_total_events_and_bytes_txn(self, txn, low_pos, high_pos):
-        """Gets the total_events and total_event_bytes counts for rooms and
-        senders, in a range of stream_orderings (including backfilled events).
-
-        Args:
-            txn
-            low_pos (int): Low stream ordering
-            high_pos (int): High stream ordering
-
-        Returns:
-            tuple[dict[str, dict[str, int]], dict[str, dict[str, int]]]: The
-            room and user deltas for total_events/total_event_bytes in the
-            format of `stats_id` -> fields
-        """
-
-        if low_pos >= high_pos:
-            # nothing to do here.
-            return {}, {}
-
-        if isinstance(self.database_engine, PostgresEngine):
-            new_bytes_expression = "OCTET_LENGTH(json)"
-        else:
-            new_bytes_expression = "LENGTH(CAST(json AS BLOB))"
-
-        sql = """
-            SELECT events.room_id, COUNT(*) AS new_events, SUM(%s) AS new_bytes
-            FROM events INNER JOIN event_json USING (event_id)
-            WHERE (? < stream_ordering AND stream_ordering <= ?)
-                OR (? <= stream_ordering AND stream_ordering <= ?)
-            GROUP BY events.room_id
-        """ % (
-            new_bytes_expression,
-        )
-
-        txn.execute(sql, (low_pos, high_pos, -high_pos, -low_pos))
-
-        room_deltas = {
-            room_id: {"total_events": new_events, "total_event_bytes": new_bytes}
-            for room_id, new_events, new_bytes in txn
-        }
-
-        sql = """
-            SELECT events.sender, COUNT(*) AS new_events, SUM(%s) AS new_bytes
-            FROM events INNER JOIN event_json USING (event_id)
-            WHERE (? < stream_ordering AND stream_ordering <= ?)
-                OR (? <= stream_ordering AND stream_ordering <= ?)
-            GROUP BY events.sender
-        """ % (
-            new_bytes_expression,
-        )
-
-        txn.execute(sql, (low_pos, high_pos, -high_pos, -low_pos))
-
-        user_deltas = {
-            user_id: {"total_events": new_events, "total_event_bytes": new_bytes}
-            for user_id, new_events, new_bytes in txn
-            if self.hs.is_mine_id(user_id)
-        }
-
-        return room_deltas, user_deltas
-
-    @defer.inlineCallbacks
-    def _calculate_and_set_initial_state_for_room(self, room_id):
-        """Calculate and insert an entry into room_stats_current.
-
-        Args:
-            room_id (str)
-
-        Returns:
-            Deferred[tuple[dict, dict, int]]: A tuple of room state, membership
-            counts and stream position.
-        """
-
-        def _fetch_current_state_stats(txn):
-            pos = self.get_room_max_stream_ordering()
-
-            rows = self._simple_select_many_txn(
-                txn,
-                table="current_state_events",
-                column="type",
-                iterable=[
-                    EventTypes.Create,
-                    EventTypes.JoinRules,
-                    EventTypes.RoomHistoryVisibility,
-                    EventTypes.Encryption,
-                    EventTypes.Name,
-                    EventTypes.Topic,
-                    EventTypes.RoomAvatar,
-                    EventTypes.CanonicalAlias,
-                ],
-                keyvalues={"room_id": room_id, "state_key": ""},
-                retcols=["event_id"],
-            )
-
-            event_ids = [row["event_id"] for row in rows]
-
-            txn.execute(
-                """
-                    SELECT membership, count(*) FROM current_state_events
-                    WHERE room_id = ? AND type = 'm.room.member'
-                    GROUP BY membership
-                """,
-                (room_id,),
-            )
-            membership_counts = {membership: cnt for membership, cnt in txn}
-
-            txn.execute(
-                """
-                    SELECT COALESCE(count(*), 0) FROM current_state_events
-                    WHERE room_id = ?
-                """,
-                (room_id,),
-            )
-
-            current_state_events_count, = txn.fetchone()
-
-            users_in_room = self.get_users_in_room_txn(txn, room_id)
-
-            return (
-                event_ids,
-                membership_counts,
-                current_state_events_count,
-                users_in_room,
-                pos,
-            )
-
-        (
-            event_ids,
-            membership_counts,
-            current_state_events_count,
-            users_in_room,
-            pos,
-        ) = yield self.runInteraction(
-            "get_initial_state_for_room", _fetch_current_state_stats
-        )
-
-        state_event_map = yield self.get_events(event_ids, get_prev_content=False)
-
-        room_state = {
-            "join_rules": None,
-            "history_visibility": None,
-            "encryption": None,
-            "name": None,
-            "topic": None,
-            "avatar": None,
-            "canonical_alias": None,
-            "is_federatable": True,
-        }
-
-        for event in state_event_map.values():
-            if event.type == EventTypes.JoinRules:
-                room_state["join_rules"] = event.content.get("join_rule")
-            elif event.type == EventTypes.RoomHistoryVisibility:
-                room_state["history_visibility"] = event.content.get(
-                    "history_visibility"
-                )
-            elif event.type == EventTypes.Encryption:
-                room_state["encryption"] = event.content.get("algorithm")
-            elif event.type == EventTypes.Name:
-                room_state["name"] = event.content.get("name")
-            elif event.type == EventTypes.Topic:
-                room_state["topic"] = event.content.get("topic")
-            elif event.type == EventTypes.RoomAvatar:
-                room_state["avatar"] = event.content.get("url")
-            elif event.type == EventTypes.CanonicalAlias:
-                room_state["canonical_alias"] = event.content.get("alias")
-            elif event.type == EventTypes.Create:
-                room_state["is_federatable"] = (
-                    event.content.get("m.federate", True) is True
-                )
-
-        yield self.update_room_state(room_id, room_state)
-
-        local_users_in_room = [u for u in users_in_room if self.hs.is_mine_id(u)]
-
-        yield self.update_stats_delta(
-            ts=self.clock.time_msec(),
-            stats_type="room",
-            stats_id=room_id,
-            fields={},
-            complete_with_stream_id=pos,
-            absolute_field_overrides={
-                "current_state_events": current_state_events_count,
-                "joined_members": membership_counts.get(Membership.JOIN, 0),
-                "invited_members": membership_counts.get(Membership.INVITE, 0),
-                "left_members": membership_counts.get(Membership.LEAVE, 0),
-                "banned_members": membership_counts.get(Membership.BAN, 0),
-                "local_users_in_room": len(local_users_in_room),
-            },
-        )
-
-    @defer.inlineCallbacks
-    def _calculate_and_set_initial_state_for_user(self, user_id):
-        def _calculate_and_set_initial_state_for_user_txn(txn):
-            pos = self._get_max_stream_id_in_current_state_deltas_txn(txn)
-
-            txn.execute(
-                """
-                SELECT COUNT(distinct room_id) FROM current_state_events
-                    WHERE type = 'm.room.member' AND state_key = ?
-                        AND membership = 'join'
-                """,
-                (user_id,),
-            )
-            count, = txn.fetchone()
-            return count, pos
-
-        joined_rooms, pos = yield self.runInteraction(
-            "calculate_and_set_initial_state_for_user",
-            _calculate_and_set_initial_state_for_user_txn,
-        )
-
-        yield self.update_stats_delta(
-            ts=self.clock.time_msec(),
-            stats_type="user",
-            stats_id=user_id,
-            fields={},
-            complete_with_stream_id=pos,
-            absolute_field_overrides={"joined_rooms": joined_rooms},
-        )