diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index 9d6c3027d5..58aa8a7e49 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -17,6 +17,12 @@
import logging
from itertools import chain
+from synapse.storage.prepare_database import get_statements
+
+from synapse.storage.engines import Sqlite3Engine
+from twisted.internet import defer
+
+from synapse.api.constants import Membership, EventTypes
from twisted.internet.defer import DeferredLock
from synapse.storage import PostgresEngine
@@ -49,6 +55,9 @@ PER_SLICE_FIELDS = {"room": (), "user": ()}
TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
+# these are the tables which contain our actual subjects
+TYPE_TO_ORIGIN_TABLE = {"room": "rooms", "user": "users"}
+
class StatsStore(StateDeltasStore):
def __init__(self, db_conn, hs):
@@ -61,8 +70,18 @@ class StatsStore(StateDeltasStore):
self.stats_delta_processing_lock = DeferredLock()
- self.register_noop_background_update("populate_stats_createtables")
- self.register_noop_background_update("populate_stats_process_rooms")
+ self.register_background_update_handler(
+ "populate_stats_prepare", self._populate_stats_prepare
+ )
+ self.register_background_update_handler(
+ "populate_stats_process_rooms", self._populate_stats_process_rooms
+ )
+ self.register_background_update_handler(
+ "populate_stats_process_users", self._populate_stats_process_users
+ )
+ # we no longer need to perform clean-up, but we will give ourselves
+ # the potential to reintroduce it in the future – so documentation
+ # will still encourage the use of this no-op handler.
self.register_noop_background_update("populate_stats_cleanup")
def quantise_stats_time(self, ts):
@@ -81,6 +100,442 @@ class StatsStore(StateDeltasStore):
"""
return (ts // self.stats_bucket_size) * self.stats_bucket_size
+ @defer.inlineCallbacks
+ def _unwedge_incremental_processor(self, forced_promise):
+ """
+ Make a promise about what this stats regeneration will handle,
+ so that we can allow the incremental processor to start doing things
+ right away – 'unwedging' it.
+
+ Args:
+ forced_promise (dict of positions):
+ If supplied, this is the promise that is made.
+ Otherwise, a promise is made that reduces the amount of work
+ that must be performed by the incremental processor.
+ """
+
+ if forced_promise is None:
+ promised_stats_delta_pos = (
+ yield self.get_max_stream_id_in_current_state_deltas()
+ )
+ promised_max = self.get_room_max_stream_ordering()
+ promised_min = self.get_room_min_stream_ordering()
+
+ promised_positions = {
+ "state_delta_stream_id": promised_stats_delta_pos,
+ "total_events_min_stream_ordering": promised_min,
+ "total_events_max_stream_ordering": promised_max,
+ }
+ else:
+ promised_positions = forced_promise
+
+ # this stores it for our reference later
+ yield self.update_stats_positions(
+ promised_positions, for_initial_processor=True
+ )
+
+ # this unwedges the incremental processor
+ yield self.update_stats_positions(
+ promised_positions, for_initial_processor=False
+ )
+
+ # with the delta processor unwedged, now let it catch up in case
+ # anything was missed during the wedge period
+ self.clock.call_later(0, self.hs.get_stats_handler().notify_new_event)
+
+ @defer.inlineCallbacks
+ def _populate_stats_prepare(self, progress, batch_size):
+ """
+ This is a background update, which prepares the database for
+ statistics regeneration.
+ """
+
+ if not self.stats_enabled:
+ yield self._end_background_update("populate_stats_prepare")
+ return 1
+
+ def _wedge_incremental_processor(txn):
+ """
+ Wedge the incremental processor (by setting its positions to NULL),
+ and return its previous positions – atomically.
+ """
+
+ with self.stats_delta_processing_lock:
+ old = self._get_stats_positions_txn(txn, for_initial_processor=False)
+ self._update_stats_positions_txn(txn, None, for_initial_processor=False)
+
+ return old
+
+ def _make_skeletons(txn, stats_type):
+ """
+ Get all the rooms and users that we want to process, and create
+ 'skeletons' (incomplete _stats_current rows) for them, if they do
+ not already have a row.
+ """
+
+ if isinstance(self.database_engine, Sqlite3Engine):
+ sql = """
+ INSERT OR IGNORE INTO %(table)s_current
+ (%(id_col)s, completed_delta_stream_id, %(zero_cols)s)
+ SELECT %(id_col)s, NULL, %(zeroes)s FROM %(origin_table)s
+ """
+ else:
+ sql = """
+ INSERT INTO %(table)s_current
+ (%(id_col)s, completed_delta_stream_id, %(zero_cols)s)
+ SELECT %(id_col)s, NULL, %(zeroes)s FROM %(origin_table)s
+ ON CONFLICT DO NOTHING
+ """
+
+ table, id_col = TYPE_TO_TABLE[stats_type]
+ origin_table = TYPE_TO_ORIGIN_TABLE[stats_type]
+ zero_cols = list(chain(ABSOLUTE_STATS_FIELDS[stats_type], PER_SLICE_FIELDS[stats_type]))
+
+ txn.execute(sql % {
+ "table": table,
+ "id_col": id_col,
+ "origin_table": origin_table,
+ "zero_cols": zero_cols,
+ "zeroes": ", ".join(["0"] * len(zero_cols))
+ })
+
+ def _delete_dirty_skeletons(txn):
+ """
+ Delete pre-existing rows which are incomplete.
+ """
+ sql = """
+ DELETE FROM %s_current
+ WHERE completed_delta_stream_id IS NULL
+ """
+
+ for _k, (table, id_col) in TYPE_TO_TABLE:
+ txn.execute(sql % (table,))
+
+ # first wedge the incremental processor and reset our promise
+ old_positions = yield self.runInteraction(
+ "populate_stats_wedge", _wedge_incremental_processor
+ )
+
+ if None in old_positions.values():
+ old_positions = None
+
+ # with the incremental processor wedged, we delete dirty skeleton rows
+ # since we don't want to double-count them.
+ yield self.runInteraction(
+ "populate_stats_delete_dirty_skeletons", _delete_dirty_skeletons
+ )
+
+ yield self._unwedge_incremental_processor(old_positions)
+
+ yield self.runInteraction("populate_stats_make_skeletons", _make_skeletons)
+ self.get_earliest_token_for_stats.invalidate_all()
+
+ yield self._end_background_update("populate_stats_prepare")
+ return 1
+
+ @defer.inlineCallbacks
+ def _populate_stats_process_users(self, progress, batch_size):
+ """
+ This is a background update which regenerates statistics for users.
+ """
+ if not self.stats_enabled:
+ yield self._end_background_update("populate_stats_process_users")
+ return 1
+
+ def _get_next_batch(txn):
+ # Only fetch 250 users, so we don't fetch too many at once, even
+ # if those 250 users have less than batch_size state events.
+ sql = """
+ SELECT user_id FROM user_stats_current
+ WHERE completed_delta_stream_id IS NULL
+ LIMIT 250
+ """
+ txn.execute(sql)
+ users_to_work_on = txn.fetchall()
+
+ if not users_to_work_on:
+ return None
+
+ # Get how many are left to process, so we can give status on how
+ # far we are in processing
+ txn.execute(
+ "SELECT COUNT(*) FROM room_stats_current"
+ " WHERE completed_delta_stream_id IS NULL"
+ )
+ progress["remaining"] = txn.fetchone()[0]
+
+ return users_to_work_on
+
+ users_to_work_on = yield self.runInteraction(
+ "populate_stats_users_get_batch", _get_next_batch
+ )
+
+ # No more users -- complete the transaction.
+ if not users_to_work_on:
+ yield self._end_background_update("populate_stats_process_users")
+ return 1
+
+ logger.info(
+ "Processing the next %d users of %d remaining",
+ len(users_to_work_on),
+ progress["remaining"],
+ )
+
+ processed_membership_count = 0
+
+ promised_positions = yield self.get_stats_positions(for_initial_processor=True)
+
+ if None in promised_positions:
+ logger.error(
+ "There is a None in promised_positions;"
+ " dependency task must not have been run."
+ " promised_positions: %r",
+ promised_positions,
+ )
+ yield self._end_background_update("populate_stats_process_users")
+ return 1
+
+ for (user_id,) in users_to_work_on:
+ now = self.hs.get_reactor().seconds()
+
+ def _process_user(txn):
+ # Get the current token
+ current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
+
+ sql = """
+ SELECT
+ (
+ join_rules = 'public'
+ OR history_visibility = 'world_readable'
+ ) AS is_public,
+ COUNT(*) AS count
+ FROM room_memberships
+ JOIN room_state USING (room_id)
+ WHERE
+ user_id = ? AND membership = 'join'
+ GROUP BY is_public
+ """
+ txn.execute(sql, (user_id,))
+ room_counts_by_publicness = dict(txn.fetchall())
+
+ self._update_stats_delta_txn(
+ txn,
+ now,
+ "user",
+ user_id,
+ {},
+ complete_with_stream_id=current_token,
+ absolute_field_overrides={
+ # these are counted absolutely because it is
+ # more difficult to count them from the promised time,
+ # because counting them now can use the quick lookup
+ # tables.
+ "public_rooms": room_counts_by_publicness.get(True, 0),
+ "private_rooms": room_counts_by_publicness.get(False, 0),
+ },
+ )
+
+ # we use this count for rate-limiting
+ return sum(room_counts_by_publicness.values())
+
+ processed_membership_count += yield self.runInteraction(
+ "update_user_stats", _process_user
+ )
+
+ # Update the remaining counter.
+ progress["remaining"] -= 1
+
+ if processed_membership_count > batch_size:
+ # Don't process any more users, we've hit our batch size.
+ return processed_membership_count
+
+ yield self.runInteraction(
+ "populate_stats",
+ self._background_update_progress_txn,
+ "populate_stats_process_users",
+ progress,
+ )
+
+ return processed_membership_count
+
+ @defer.inlineCallbacks
+ def _populate_stats_process_rooms(self, progress, batch_size):
+ """
+ This is a background update which regenerates statistics for rooms.
+ """
+ if not self.stats_enabled:
+ yield self._end_background_update("populate_stats_process_rooms")
+ return 1
+
+ def _get_next_batch(txn):
+ # Only fetch 250 rooms, so we don't fetch too many at once, even
+ # if those 250 rooms have less than batch_size state events.
+ sql = """
+ SELECT room_id FROM room_stats_current
+ WHERE completed_delta_stream_id IS NULL
+ LIMIT 250
+ """
+ txn.execute(sql)
+ rooms_to_work_on = txn.fetchall()
+
+ if not rooms_to_work_on:
+ return None
+
+ # Get how many are left to process, so we can give status on how
+ # far we are in processing
+ txn.execute(
+ "SELECT COUNT(*) FROM room_stats_current"
+ " WHERE completed_delta_stream_id IS NULL"
+ )
+ progress["remaining"] = txn.fetchone()[0]
+
+ return rooms_to_work_on
+
+ rooms_to_work_on = yield self.runInteraction(
+ "populate_stats_rooms_get_batch", _get_next_batch
+ )
+
+ # No more rooms -- complete the transaction.
+ if not rooms_to_work_on:
+ yield self._end_background_update("populate_stats_process_rooms")
+ return 1
+
+ logger.info(
+ "Processing the next %d rooms of %d remaining",
+ len(rooms_to_work_on),
+ progress["remaining"],
+ )
+
+ # Number of state events we've processed by going through each room
+ processed_event_count = 0
+
+ promised_positions = yield self.get_stats_positions(for_initial_processor=True)
+
+ if None in promised_positions:
+ logger.error(
+ "There is a None in promised_positions;"
+ " dependency task must not have been run."
+ " promised_positions: %s",
+ promised_positions,
+ )
+ yield self._end_background_update("populate_stats_process_rooms")
+ return 1
+
+ for (room_id,) in rooms_to_work_on:
+ current_state_ids = yield self.get_current_state_ids(room_id)
+
+ join_rules_id = current_state_ids.get((EventTypes.JoinRules, ""))
+ history_visibility_id = current_state_ids.get(
+ (EventTypes.RoomHistoryVisibility, "")
+ )
+ encryption_id = current_state_ids.get((EventTypes.RoomEncryption, ""))
+ name_id = current_state_ids.get((EventTypes.Name, ""))
+ topic_id = current_state_ids.get((EventTypes.Topic, ""))
+ avatar_id = current_state_ids.get((EventTypes.RoomAvatar, ""))
+ canonical_alias_id = current_state_ids.get((EventTypes.CanonicalAlias, ""))
+
+ event_ids = [
+ join_rules_id,
+ history_visibility_id,
+ encryption_id,
+ name_id,
+ topic_id,
+ avatar_id,
+ canonical_alias_id,
+ ]
+
+ state_events = yield self.get_events(
+ [ev for ev in event_ids if ev is not None]
+ )
+
+ def _get_or_none(event_id, arg):
+ event = state_events.get(event_id)
+ if event:
+ return event.content.get(arg)
+ return None
+
+ yield self.update_room_state(
+ room_id,
+ {
+ "join_rules": _get_or_none(join_rules_id, "join_rule"),
+ "history_visibility": _get_or_none(
+ history_visibility_id, "history_visibility"
+ ),
+ "encryption": _get_or_none(encryption_id, "algorithm"),
+ "name": _get_or_none(name_id, "name"),
+ "topic": _get_or_none(topic_id, "topic"),
+ "avatar": _get_or_none(avatar_id, "url"),
+ "canonical_alias": _get_or_none(canonical_alias_id, "alias"),
+ },
+ )
+
+ now = self.clock.time_msec()
+
+ def _fetch_data(txn):
+ # Get the current token of the room
+ current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
+
+ current_state_events = len(current_state_ids)
+
+ membership_counts = self._get_user_counts_in_room_txn(txn, room_id)
+
+ room_total_event_count, room_total_event_bytes = self._count_events_and_bytes_in_room_txn(
+ txn,
+ room_id,
+ promised_positions["total_events_min_stream_ordering"],
+ promised_positions["total_events_max_stream_ordering"],
+ )
+
+ self._update_stats_delta_txn(
+ txn,
+ now,
+ "room",
+ room_id,
+ {
+ "total_events": room_total_event_count,
+ "total_event_bytes": room_total_event_bytes
+ },
+ complete_with_stream_id=current_token,
+ absolute_field_overrides={
+ # these are counted absolutely because it is
+ # more difficult to count them from the promised time,
+ # because counting them now can use the quick lookup
+ # tables.
+ "current_state_events": current_state_events,
+ "joined_members": membership_counts.get(Membership.JOIN, 0),
+ "invited_members": membership_counts.get(
+ Membership.INVITE, 0
+ ),
+ "left_members": membership_counts.get(Membership.LEAVE, 0),
+ "banned_members": membership_counts.get(Membership.BAN, 0),
+ },
+ )
+
+ # we use this count for rate-limiting
+ return room_total_event_count
+
+ room_event_count = yield self.runInteraction(
+ "update_room_stats", _fetch_data
+ )
+
+ # Update the remaining counter.
+ progress["remaining"] -= 1
+
+ processed_event_count += room_event_count
+
+ if processed_event_count > batch_size:
+ # Don't process any more rooms, we've hit our batch size.
+ return processed_event_count
+
+ yield self.runInteraction(
+ "populate_stats",
+ self._background_update_progress_txn,
+ "populate_stats_process_rooms",
+ progress,
+ )
+
+ return processed_event_count
+
def get_stats_positions(self, for_initial_processor=False):
"""
Returns the stats processor positions.
@@ -555,7 +1010,7 @@ class StatsStore(StateDeltasStore):
# nothing to do here.
return
- now = self.hs.clock.time_msec()
+ now = self.clock.time_msec()
# we choose comparators based on the signs
low_comparator = "<=" if low_pos < 0 else "<"
@@ -587,3 +1042,36 @@ class StatsStore(StateDeltasStore):
room_id,
{"total_events": new_events, "total_event_bytes": new_bytes},
)
+
+ def _count_events_and_bytes_in_room_txn(self, txn, room_id, low_token, high_token):
+ """
+ Count the number of events and event bytes in a room between two tokens,
+ inclusive.
+ Args:
+ txn (cursor): The database
+ room_id (str): The ID of the room to count events for
+ low_token (int): the minimum stream ordering to count
+ high_token (int): the maximum stream ordering to count
+
+ Returns (tuple[int, int]):
+ First element (int):
+ the number of events
+ Second element (int):
+ the number of bytes in events' event JSON
+ """
+
+ if isinstance(self.database_engine, PostgresEngine):
+ bytes_expression = "OCTET_LENGTH(json)"
+ else:
+ bytes_expression = "LENGTH(CAST(json AS BLOB))"
+
+ sql = """
+ SELECT COUNT(*) AS num_events, SUM(%s) AS num_bytes
+ FROM events
+ JOIN event_json USING (event_id)
+ WHERE room_id = ?
+ AND ? <= stream_ordering
+ AND stream_ordering <= ?
+ """ % (bytes_expression,)
+ txn.execute(sql, (room_id, low_token, high_token))
+ return txn.fetchone()
|