diff options
author | Erik Johnston <erik@matrix.org> | 2019-10-21 12:56:42 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2019-10-21 16:05:06 +0100 |
commit | c66a06ac6b69b0a03f5c6284ded980399e9df94e (patch) | |
tree | 01dfd3b9098a9ace759403744d122c18efbd97ff /synapse/storage/stats.py | |
parent | Merge branch 'master' into develop (diff) | |
download | synapse-c66a06ac6b69b0a03f5c6284ded980399e9df94e.tar.xz |
Move storage classes into a main "data store".
This is in preparation for having multiple data stores that offer different functionality, e.g. splitting out state or event storage.
Diffstat (limited to 'synapse/storage/stats.py')
-rw-r--r-- | synapse/storage/stats.py | 881 |
1 files changed, 0 insertions, 881 deletions
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py deleted file mode 100644 index 7c224cd3d9..0000000000 --- a/synapse/storage/stats.py +++ /dev/null @@ -1,881 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2018, 2019 New Vector Ltd -# Copyright 2019 The Matrix.org Foundation C.I.C. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -from itertools import chain - -from twisted.internet import defer -from twisted.internet.defer import DeferredLock - -from synapse.api.constants import EventTypes, Membership -from synapse.storage import PostgresEngine -from synapse.storage.state_deltas import StateDeltasStore -from synapse.util.caches.descriptors import cached - -logger = logging.getLogger(__name__) - -# these fields track absolutes (e.g. total number of rooms on the server) -# You can think of these as Prometheus Gauges. -# You can draw these stats on a line graph. -# Example: number of users in a room -ABSOLUTE_STATS_FIELDS = { - "room": ( - "current_state_events", - "joined_members", - "invited_members", - "left_members", - "banned_members", - "local_users_in_room", - ), - "user": ("joined_rooms",), -} - -# these fields are per-timeslice and so should be reset to 0 upon a new slice -# You can draw these stats on a histogram. -# Example: number of events sent locally during a time slice -PER_SLICE_FIELDS = { - "room": ("total_events", "total_event_bytes"), - "user": ("invites_sent", "rooms_created", "total_events", "total_event_bytes"), -} - -TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")} - -# these are the tables (& ID columns) which contain our actual subjects -TYPE_TO_ORIGIN_TABLE = {"room": ("rooms", "room_id"), "user": ("users", "name")} - - -class StatsStore(StateDeltasStore): - def __init__(self, db_conn, hs): - super(StatsStore, self).__init__(db_conn, hs) - - self.server_name = hs.hostname - self.clock = self.hs.get_clock() - self.stats_enabled = hs.config.stats_enabled - self.stats_bucket_size = hs.config.stats_bucket_size - - self.stats_delta_processing_lock = DeferredLock() - - self.register_background_update_handler( - "populate_stats_process_rooms", self._populate_stats_process_rooms - ) - self.register_background_update_handler( - "populate_stats_process_users", self._populate_stats_process_users - ) - # we no longer need to perform clean-up, but we will give ourselves - # the potential to reintroduce it in the future – so documentation - # will still encourage the use of this no-op handler. - self.register_noop_background_update("populate_stats_cleanup") - self.register_noop_background_update("populate_stats_prepare") - - def quantise_stats_time(self, ts): - """ - Quantises a timestamp to be a multiple of the bucket size. - - Args: - ts (int): the timestamp to quantise, in milliseconds since the Unix - Epoch - - Returns: - int: a timestamp which - - is divisible by the bucket size; - - is no later than `ts`; and - - is the largest such timestamp. - """ - return (ts // self.stats_bucket_size) * self.stats_bucket_size - - @defer.inlineCallbacks - def _populate_stats_process_users(self, progress, batch_size): - """ - This is a background update which regenerates statistics for users. - """ - if not self.stats_enabled: - yield self._end_background_update("populate_stats_process_users") - return 1 - - last_user_id = progress.get("last_user_id", "") - - def _get_next_batch(txn): - sql = """ - SELECT DISTINCT name FROM users - WHERE name > ? - ORDER BY name ASC - LIMIT ? - """ - txn.execute(sql, (last_user_id, batch_size)) - return [r for r, in txn] - - users_to_work_on = yield self.runInteraction( - "_populate_stats_process_users", _get_next_batch - ) - - # No more rooms -- complete the transaction. - if not users_to_work_on: - yield self._end_background_update("populate_stats_process_users") - return 1 - - for user_id in users_to_work_on: - yield self._calculate_and_set_initial_state_for_user(user_id) - progress["last_user_id"] = user_id - - yield self.runInteraction( - "populate_stats_process_users", - self._background_update_progress_txn, - "populate_stats_process_users", - progress, - ) - - return len(users_to_work_on) - - @defer.inlineCallbacks - def _populate_stats_process_rooms(self, progress, batch_size): - """ - This is a background update which regenerates statistics for rooms. - """ - if not self.stats_enabled: - yield self._end_background_update("populate_stats_process_rooms") - return 1 - - last_room_id = progress.get("last_room_id", "") - - def _get_next_batch(txn): - sql = """ - SELECT DISTINCT room_id FROM current_state_events - WHERE room_id > ? - ORDER BY room_id ASC - LIMIT ? - """ - txn.execute(sql, (last_room_id, batch_size)) - return [r for r, in txn] - - rooms_to_work_on = yield self.runInteraction( - "populate_stats_rooms_get_batch", _get_next_batch - ) - - # No more rooms -- complete the transaction. - if not rooms_to_work_on: - yield self._end_background_update("populate_stats_process_rooms") - return 1 - - for room_id in rooms_to_work_on: - yield self._calculate_and_set_initial_state_for_room(room_id) - progress["last_room_id"] = room_id - - yield self.runInteraction( - "_populate_stats_process_rooms", - self._background_update_progress_txn, - "populate_stats_process_rooms", - progress, - ) - - return len(rooms_to_work_on) - - def get_stats_positions(self): - """ - Returns the stats processor positions. - """ - return self._simple_select_one_onecol( - table="stats_incremental_position", - keyvalues={}, - retcol="stream_id", - desc="stats_incremental_position", - ) - - def update_room_state(self, room_id, fields): - """ - Args: - room_id (str) - fields (dict[str:Any]) - """ - - # For whatever reason some of the fields may contain null bytes, which - # postgres isn't a fan of, so we replace those fields with null. - for col in ( - "join_rules", - "history_visibility", - "encryption", - "name", - "topic", - "avatar", - "canonical_alias", - ): - field = fields.get(col) - if field and "\0" in field: - fields[col] = None - - return self._simple_upsert( - table="room_stats_state", - keyvalues={"room_id": room_id}, - values=fields, - desc="update_room_state", - ) - - def get_statistics_for_subject(self, stats_type, stats_id, start, size=100): - """ - Get statistics for a given subject. - - Args: - stats_type (str): The type of subject - stats_id (str): The ID of the subject (e.g. room_id or user_id) - start (int): Pagination start. Number of entries, not timestamp. - size (int): How many entries to return. - - Returns: - Deferred[list[dict]], where the dict has the keys of - ABSOLUTE_STATS_FIELDS[stats_type], and "bucket_size" and "end_ts". - """ - return self.runInteraction( - "get_statistics_for_subject", - self._get_statistics_for_subject_txn, - stats_type, - stats_id, - start, - size, - ) - - def _get_statistics_for_subject_txn( - self, txn, stats_type, stats_id, start, size=100 - ): - """ - Transaction-bound version of L{get_statistics_for_subject}. - """ - - table, id_col = TYPE_TO_TABLE[stats_type] - selected_columns = list( - ABSOLUTE_STATS_FIELDS[stats_type] + PER_SLICE_FIELDS[stats_type] - ) - - slice_list = self._simple_select_list_paginate_txn( - txn, - table + "_historical", - {id_col: stats_id}, - "end_ts", - start, - size, - retcols=selected_columns + ["bucket_size", "end_ts"], - order_direction="DESC", - ) - - return slice_list - - def get_room_stats_state(self, room_id): - """ - Returns the current room_stats_state for a room. - - Args: - room_id (str): The ID of the room to return state for. - - Returns (dict): - Dictionary containing these keys: - "name", "topic", "canonical_alias", "avatar", "join_rules", - "history_visibility" - """ - return self._simple_select_one( - "room_stats_state", - {"room_id": room_id}, - retcols=( - "name", - "topic", - "canonical_alias", - "avatar", - "join_rules", - "history_visibility", - ), - ) - - @cached() - def get_earliest_token_for_stats(self, stats_type, id): - """ - Fetch the "earliest token". This is used by the room stats delta - processor to ignore deltas that have been processed between the - start of the background task and any particular room's stats - being calculated. - - Returns: - Deferred[int] - """ - table, id_col = TYPE_TO_TABLE[stats_type] - - return self._simple_select_one_onecol( - "%s_current" % (table,), - keyvalues={id_col: id}, - retcol="completed_delta_stream_id", - allow_none=True, - ) - - def bulk_update_stats_delta(self, ts, updates, stream_id): - """Bulk update stats tables for a given stream_id and updates the stats - incremental position. - - Args: - ts (int): Current timestamp in ms - updates(dict[str, dict[str, dict[str, Counter]]]): The updates to - commit as a mapping stats_type -> stats_id -> field -> delta. - stream_id (int): Current position. - - Returns: - Deferred - """ - - def _bulk_update_stats_delta_txn(txn): - for stats_type, stats_updates in updates.items(): - for stats_id, fields in stats_updates.items(): - logger.info( - "Updating %s stats for %s: %s", stats_type, stats_id, fields - ) - self._update_stats_delta_txn( - txn, - ts=ts, - stats_type=stats_type, - stats_id=stats_id, - fields=fields, - complete_with_stream_id=stream_id, - ) - - self._simple_update_one_txn( - txn, - table="stats_incremental_position", - keyvalues={}, - updatevalues={"stream_id": stream_id}, - ) - - return self.runInteraction( - "bulk_update_stats_delta", _bulk_update_stats_delta_txn - ) - - def update_stats_delta( - self, - ts, - stats_type, - stats_id, - fields, - complete_with_stream_id, - absolute_field_overrides=None, - ): - """ - Updates the statistics for a subject, with a delta (difference/relative - change). - - Args: - ts (int): timestamp of the change - stats_type (str): "room" or "user" – the kind of subject - stats_id (str): the subject's ID (room ID or user ID) - fields (dict[str, int]): Deltas of stats values. - complete_with_stream_id (int, optional): - If supplied, converts an incomplete row into a complete row, - with the supplied stream_id marked as the stream_id where the - row was completed. - absolute_field_overrides (dict[str, int]): Current stats values - (i.e. not deltas) of absolute fields. - Does not work with per-slice fields. - """ - - return self.runInteraction( - "update_stats_delta", - self._update_stats_delta_txn, - ts, - stats_type, - stats_id, - fields, - complete_with_stream_id=complete_with_stream_id, - absolute_field_overrides=absolute_field_overrides, - ) - - def _update_stats_delta_txn( - self, - txn, - ts, - stats_type, - stats_id, - fields, - complete_with_stream_id, - absolute_field_overrides=None, - ): - if absolute_field_overrides is None: - absolute_field_overrides = {} - - table, id_col = TYPE_TO_TABLE[stats_type] - - quantised_ts = self.quantise_stats_time(int(ts)) - end_ts = quantised_ts + self.stats_bucket_size - - # Lets be paranoid and check that all the given field names are known - abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type] - slice_field_names = PER_SLICE_FIELDS[stats_type] - for field in chain(fields.keys(), absolute_field_overrides.keys()): - if field not in abs_field_names and field not in slice_field_names: - # guard against potential SQL injection dodginess - raise ValueError( - "%s is not a recognised field" - " for stats type %s" % (field, stats_type) - ) - - # Per slice fields do not get added to the _current table - - # This calculates the deltas (`field = field + ?` values) - # for absolute fields, - # * defaulting to 0 if not specified - # (required for the INSERT part of upserting to work) - # * omitting overrides specified in `absolute_field_overrides` - deltas_of_absolute_fields = { - key: fields.get(key, 0) - for key in abs_field_names - if key not in absolute_field_overrides - } - - # Keep the delta stream ID field up to date - absolute_field_overrides = absolute_field_overrides.copy() - absolute_field_overrides["completed_delta_stream_id"] = complete_with_stream_id - - # first upsert the `_current` table - self._upsert_with_additive_relatives_txn( - txn=txn, - table=table + "_current", - keyvalues={id_col: stats_id}, - absolutes=absolute_field_overrides, - additive_relatives=deltas_of_absolute_fields, - ) - - per_slice_additive_relatives = { - key: fields.get(key, 0) for key in slice_field_names - } - self._upsert_copy_from_table_with_additive_relatives_txn( - txn=txn, - into_table=table + "_historical", - keyvalues={id_col: stats_id}, - extra_dst_insvalues={"bucket_size": self.stats_bucket_size}, - extra_dst_keyvalues={"end_ts": end_ts}, - additive_relatives=per_slice_additive_relatives, - src_table=table + "_current", - copy_columns=abs_field_names, - ) - - def _upsert_with_additive_relatives_txn( - self, txn, table, keyvalues, absolutes, additive_relatives - ): - """Used to update values in the stats tables. - - This is basically a slightly convoluted upsert that *adds* to any - existing rows. - - Args: - txn - table (str): Table name - keyvalues (dict[str, any]): Row-identifying key values - absolutes (dict[str, any]): Absolute (set) fields - additive_relatives (dict[str, int]): Fields that will be added onto - if existing row present. - """ - if self.database_engine.can_native_upsert: - absolute_updates = [ - "%(field)s = EXCLUDED.%(field)s" % {"field": field} - for field in absolutes.keys() - ] - - relative_updates = [ - "%(field)s = EXCLUDED.%(field)s + %(table)s.%(field)s" - % {"table": table, "field": field} - for field in additive_relatives.keys() - ] - - insert_cols = [] - qargs = [] - - for (key, val) in chain( - keyvalues.items(), absolutes.items(), additive_relatives.items() - ): - insert_cols.append(key) - qargs.append(val) - - sql = """ - INSERT INTO %(table)s (%(insert_cols_cs)s) - VALUES (%(insert_vals_qs)s) - ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s - """ % { - "table": table, - "insert_cols_cs": ", ".join(insert_cols), - "insert_vals_qs": ", ".join( - ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives)) - ), - "key_columns": ", ".join(keyvalues), - "updates": ", ".join(chain(absolute_updates, relative_updates)), - } - - txn.execute(sql, qargs) - else: - self.database_engine.lock_table(txn, table) - retcols = list(chain(absolutes.keys(), additive_relatives.keys())) - current_row = self._simple_select_one_txn( - txn, table, keyvalues, retcols, allow_none=True - ) - if current_row is None: - merged_dict = {**keyvalues, **absolutes, **additive_relatives} - self._simple_insert_txn(txn, table, merged_dict) - else: - for (key, val) in additive_relatives.items(): - current_row[key] += val - current_row.update(absolutes) - self._simple_update_one_txn(txn, table, keyvalues, current_row) - - def _upsert_copy_from_table_with_additive_relatives_txn( - self, - txn, - into_table, - keyvalues, - extra_dst_keyvalues, - extra_dst_insvalues, - additive_relatives, - src_table, - copy_columns, - ): - """Updates the historic stats table with latest updates. - - This involves copying "absolute" fields from the `_current` table, and - adding relative fields to any existing values. - - Args: - txn: Transaction - into_table (str): The destination table to UPSERT the row into - keyvalues (dict[str, any]): Row-identifying key values - extra_dst_keyvalues (dict[str, any]): Additional keyvalues - for `into_table`. - extra_dst_insvalues (dict[str, any]): Additional values to insert - on new row creation for `into_table`. - additive_relatives (dict[str, any]): Fields that will be added onto - if existing row present. (Must be disjoint from copy_columns.) - src_table (str): The source table to copy from - copy_columns (iterable[str]): The list of columns to copy - """ - if self.database_engine.can_native_upsert: - ins_columns = chain( - keyvalues, - copy_columns, - additive_relatives, - extra_dst_keyvalues, - extra_dst_insvalues, - ) - sel_exprs = chain( - keyvalues, - copy_columns, - ( - "?" - for _ in chain( - additive_relatives, extra_dst_keyvalues, extra_dst_insvalues - ) - ), - ) - keyvalues_where = ("%s = ?" % f for f in keyvalues) - - sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns) - sets_ar = ( - "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f) - for f in additive_relatives - ) - - sql = """ - INSERT INTO %(into_table)s (%(ins_columns)s) - SELECT %(sel_exprs)s - FROM %(src_table)s - WHERE %(keyvalues_where)s - ON CONFLICT (%(keyvalues)s) - DO UPDATE SET %(sets)s - """ % { - "into_table": into_table, - "ins_columns": ", ".join(ins_columns), - "sel_exprs": ", ".join(sel_exprs), - "keyvalues_where": " AND ".join(keyvalues_where), - "src_table": src_table, - "keyvalues": ", ".join( - chain(keyvalues.keys(), extra_dst_keyvalues.keys()) - ), - "sets": ", ".join(chain(sets_cc, sets_ar)), - } - - qargs = list( - chain( - additive_relatives.values(), - extra_dst_keyvalues.values(), - extra_dst_insvalues.values(), - keyvalues.values(), - ) - ) - txn.execute(sql, qargs) - else: - self.database_engine.lock_table(txn, into_table) - src_row = self._simple_select_one_txn( - txn, src_table, keyvalues, copy_columns - ) - all_dest_keyvalues = {**keyvalues, **extra_dst_keyvalues} - dest_current_row = self._simple_select_one_txn( - txn, - into_table, - keyvalues=all_dest_keyvalues, - retcols=list(chain(additive_relatives.keys(), copy_columns)), - allow_none=True, - ) - - if dest_current_row is None: - merged_dict = { - **keyvalues, - **extra_dst_keyvalues, - **extra_dst_insvalues, - **src_row, - **additive_relatives, - } - self._simple_insert_txn(txn, into_table, merged_dict) - else: - for (key, val) in additive_relatives.items(): - src_row[key] = dest_current_row[key] + val - self._simple_update_txn(txn, into_table, all_dest_keyvalues, src_row) - - def get_changes_room_total_events_and_bytes(self, min_pos, max_pos): - """Fetches the counts of events in the given range of stream IDs. - - Args: - min_pos (int) - max_pos (int) - - Returns: - Deferred[dict[str, dict[str, int]]]: Mapping of room ID to field - changes. - """ - - return self.runInteraction( - "stats_incremental_total_events_and_bytes", - self.get_changes_room_total_events_and_bytes_txn, - min_pos, - max_pos, - ) - - def get_changes_room_total_events_and_bytes_txn(self, txn, low_pos, high_pos): - """Gets the total_events and total_event_bytes counts for rooms and - senders, in a range of stream_orderings (including backfilled events). - - Args: - txn - low_pos (int): Low stream ordering - high_pos (int): High stream ordering - - Returns: - tuple[dict[str, dict[str, int]], dict[str, dict[str, int]]]: The - room and user deltas for total_events/total_event_bytes in the - format of `stats_id` -> fields - """ - - if low_pos >= high_pos: - # nothing to do here. - return {}, {} - - if isinstance(self.database_engine, PostgresEngine): - new_bytes_expression = "OCTET_LENGTH(json)" - else: - new_bytes_expression = "LENGTH(CAST(json AS BLOB))" - - sql = """ - SELECT events.room_id, COUNT(*) AS new_events, SUM(%s) AS new_bytes - FROM events INNER JOIN event_json USING (event_id) - WHERE (? < stream_ordering AND stream_ordering <= ?) - OR (? <= stream_ordering AND stream_ordering <= ?) - GROUP BY events.room_id - """ % ( - new_bytes_expression, - ) - - txn.execute(sql, (low_pos, high_pos, -high_pos, -low_pos)) - - room_deltas = { - room_id: {"total_events": new_events, "total_event_bytes": new_bytes} - for room_id, new_events, new_bytes in txn - } - - sql = """ - SELECT events.sender, COUNT(*) AS new_events, SUM(%s) AS new_bytes - FROM events INNER JOIN event_json USING (event_id) - WHERE (? < stream_ordering AND stream_ordering <= ?) - OR (? <= stream_ordering AND stream_ordering <= ?) - GROUP BY events.sender - """ % ( - new_bytes_expression, - ) - - txn.execute(sql, (low_pos, high_pos, -high_pos, -low_pos)) - - user_deltas = { - user_id: {"total_events": new_events, "total_event_bytes": new_bytes} - for user_id, new_events, new_bytes in txn - if self.hs.is_mine_id(user_id) - } - - return room_deltas, user_deltas - - @defer.inlineCallbacks - def _calculate_and_set_initial_state_for_room(self, room_id): - """Calculate and insert an entry into room_stats_current. - - Args: - room_id (str) - - Returns: - Deferred[tuple[dict, dict, int]]: A tuple of room state, membership - counts and stream position. - """ - - def _fetch_current_state_stats(txn): - pos = self.get_room_max_stream_ordering() - - rows = self._simple_select_many_txn( - txn, - table="current_state_events", - column="type", - iterable=[ - EventTypes.Create, - EventTypes.JoinRules, - EventTypes.RoomHistoryVisibility, - EventTypes.Encryption, - EventTypes.Name, - EventTypes.Topic, - EventTypes.RoomAvatar, - EventTypes.CanonicalAlias, - ], - keyvalues={"room_id": room_id, "state_key": ""}, - retcols=["event_id"], - ) - - event_ids = [row["event_id"] for row in rows] - - txn.execute( - """ - SELECT membership, count(*) FROM current_state_events - WHERE room_id = ? AND type = 'm.room.member' - GROUP BY membership - """, - (room_id,), - ) - membership_counts = {membership: cnt for membership, cnt in txn} - - txn.execute( - """ - SELECT COALESCE(count(*), 0) FROM current_state_events - WHERE room_id = ? - """, - (room_id,), - ) - - current_state_events_count, = txn.fetchone() - - users_in_room = self.get_users_in_room_txn(txn, room_id) - - return ( - event_ids, - membership_counts, - current_state_events_count, - users_in_room, - pos, - ) - - ( - event_ids, - membership_counts, - current_state_events_count, - users_in_room, - pos, - ) = yield self.runInteraction( - "get_initial_state_for_room", _fetch_current_state_stats - ) - - state_event_map = yield self.get_events(event_ids, get_prev_content=False) - - room_state = { - "join_rules": None, - "history_visibility": None, - "encryption": None, - "name": None, - "topic": None, - "avatar": None, - "canonical_alias": None, - "is_federatable": True, - } - - for event in state_event_map.values(): - if event.type == EventTypes.JoinRules: - room_state["join_rules"] = event.content.get("join_rule") - elif event.type == EventTypes.RoomHistoryVisibility: - room_state["history_visibility"] = event.content.get( - "history_visibility" - ) - elif event.type == EventTypes.Encryption: - room_state["encryption"] = event.content.get("algorithm") - elif event.type == EventTypes.Name: - room_state["name"] = event.content.get("name") - elif event.type == EventTypes.Topic: - room_state["topic"] = event.content.get("topic") - elif event.type == EventTypes.RoomAvatar: - room_state["avatar"] = event.content.get("url") - elif event.type == EventTypes.CanonicalAlias: - room_state["canonical_alias"] = event.content.get("alias") - elif event.type == EventTypes.Create: - room_state["is_federatable"] = ( - event.content.get("m.federate", True) is True - ) - - yield self.update_room_state(room_id, room_state) - - local_users_in_room = [u for u in users_in_room if self.hs.is_mine_id(u)] - - yield self.update_stats_delta( - ts=self.clock.time_msec(), - stats_type="room", - stats_id=room_id, - fields={}, - complete_with_stream_id=pos, - absolute_field_overrides={ - "current_state_events": current_state_events_count, - "joined_members": membership_counts.get(Membership.JOIN, 0), - "invited_members": membership_counts.get(Membership.INVITE, 0), - "left_members": membership_counts.get(Membership.LEAVE, 0), - "banned_members": membership_counts.get(Membership.BAN, 0), - "local_users_in_room": len(local_users_in_room), - }, - ) - - @defer.inlineCallbacks - def _calculate_and_set_initial_state_for_user(self, user_id): - def _calculate_and_set_initial_state_for_user_txn(txn): - pos = self._get_max_stream_id_in_current_state_deltas_txn(txn) - - txn.execute( - """ - SELECT COUNT(distinct room_id) FROM current_state_events - WHERE type = 'm.room.member' AND state_key = ? - AND membership = 'join' - """, - (user_id,), - ) - count, = txn.fetchone() - return count, pos - - joined_rooms, pos = yield self.runInteraction( - "calculate_and_set_initial_state_for_user", - _calculate_and_set_initial_state_for_user_txn, - ) - - yield self.update_stats_delta( - ts=self.clock.time_msec(), - stats_type="user", - stats_id=user_id, - fields={}, - complete_with_stream_id=pos, - absolute_field_overrides={"joined_rooms": joined_rooms}, - ) |