summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/storage/stats.py507
1 files changed, 504 insertions, 3 deletions
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index 20ce3664a0..24f739a017 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -17,9 +17,12 @@
 import logging
 from itertools import chain
 
+from twisted.internet import defer
 from twisted.internet.defer import DeferredLock
 
+from synapse.api.constants import EventTypes, Membership
 from synapse.storage import PostgresEngine
+from synapse.storage.engines import Sqlite3Engine
 from synapse.storage.state_deltas import StateDeltasStore
 from synapse.util.caches.descriptors import cached
 
@@ -49,6 +52,9 @@ PER_SLICE_FIELDS = {"room": (), "user": ()}
 
 TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
 
+# these are the tables (& ID columns) which contain our actual subjects
+TYPE_TO_ORIGIN_TABLE = {"room": ("rooms", "room_id"), "user": ("users", "name")}
+
 
 class StatsStore(StateDeltasStore):
     def __init__(self, db_conn, hs):
@@ -61,8 +67,18 @@ class StatsStore(StateDeltasStore):
 
         self.stats_delta_processing_lock = DeferredLock()
 
-        self.register_noop_background_update("populate_stats_createtables")
-        self.register_noop_background_update("populate_stats_process_rooms")
+        self.register_background_update_handler(
+            "populate_stats_prepare", self._populate_stats_prepare
+        )
+        self.register_background_update_handler(
+            "populate_stats_process_rooms", self._populate_stats_process_rooms
+        )
+        self.register_background_update_handler(
+            "populate_stats_process_users", self._populate_stats_process_users
+        )
+        # we no longer need to perform clean-up, but we will give ourselves
+        # the potential to reintroduce it in the future – so documentation
+        # will still encourage the use of this no-op handler.
         self.register_noop_background_update("populate_stats_cleanup")
 
     def quantise_stats_time(self, ts):
@@ -81,6 +97,456 @@ class StatsStore(StateDeltasStore):
         """
         return (ts // self.stats_bucket_size) * self.stats_bucket_size
 
+    @defer.inlineCallbacks
+    def _unwedge_incremental_processor(self, forced_promise):
+        """
+        Make a promise about what this stats regeneration will handle,
+        so that we can allow the incremental processor to start doing things
+        right away – 'unwedging' it.
+
+        Args:
+            forced_promise (dict of positions):
+                If supplied, this is the promise that is made.
+                Otherwise, a promise is made that reduces the amount of work
+                that must be performed by the incremental processor.
+        """
+
+        if forced_promise is None:
+            promised_stats_delta_pos = (
+                yield self.get_max_stream_id_in_current_state_deltas()
+            )
+            promised_max = self.get_room_max_stream_ordering()
+            promised_min = self.get_room_min_stream_ordering()
+
+            promised_positions = {
+                "state_delta_stream_id": promised_stats_delta_pos,
+                "total_events_min_stream_ordering": promised_min,
+                "total_events_max_stream_ordering": promised_max,
+            }
+        else:
+            promised_positions = forced_promise
+
+        # this stores it for our reference later
+        yield self.update_stats_positions(
+            promised_positions, for_initial_processor=True
+        )
+
+        # this unwedges the incremental processor
+        yield self.update_stats_positions(
+            promised_positions, for_initial_processor=False
+        )
+
+        # with the delta processor unwedged, now let it catch up in case
+        # anything was missed during the wedge period
+        self.clock.call_later(0, self.hs.get_stats_handler().notify_new_event)
+
+    @defer.inlineCallbacks
+    def _populate_stats_prepare(self, progress, batch_size):
+        """
+        This is a background update, which prepares the database for
+        statistics regeneration.
+        """
+
+        if not self.stats_enabled:
+            yield self._end_background_update("populate_stats_prepare")
+            return 1
+
+        def _wedge_incremental_processor(txn):
+            """
+            Wedge the incremental processor (by setting its positions to NULL),
+            and return its previous positions – atomically.
+            """
+
+            old = self._get_stats_positions_txn(txn, for_initial_processor=False)
+            self._update_stats_positions_txn(txn, None, for_initial_processor=False)
+
+            return old
+
+        def _make_skeletons(txn, stats_type):
+            """
+            Get all the rooms and users that we want to process, and create
+            'skeletons' (incomplete _stats_current rows) for them, if they do
+            not already have a row.
+            """
+
+            if isinstance(self.database_engine, Sqlite3Engine):
+                sql = """
+                        INSERT OR IGNORE INTO %(table)s_current
+                        (%(id_col)s, completed_delta_stream_id, %(zero_cols)s)
+                        SELECT %(origin_id_col)s, NULL, %(zeroes)s FROM %(origin_table)s
+                    """
+            else:
+                sql = """
+                        INSERT INTO %(table)s_current
+                        (%(id_col)s, completed_delta_stream_id, %(zero_cols)s)
+                        SELECT %(origin_id_col)s, NULL, %(zeroes)s FROM %(origin_table)s
+                        ON CONFLICT DO NOTHING
+                    """
+
+            table, id_col = TYPE_TO_TABLE[stats_type]
+            origin_table, origin_id_col = TYPE_TO_ORIGIN_TABLE[stats_type]
+            zero_cols = list(
+                chain(ABSOLUTE_STATS_FIELDS[stats_type], PER_SLICE_FIELDS[stats_type])
+            )
+
+            txn.execute(
+                sql
+                % {
+                    "table": table,
+                    "id_col": id_col,
+                    "origin_id_col": origin_id_col,
+                    "origin_table": origin_table,
+                    "zero_cols": ", ".join(zero_cols),
+                    "zeroes": ", ".join(["0"] * len(zero_cols)),
+                }
+            )
+
+        def _delete_dirty_skeletons(txn):
+            """
+            Delete pre-existing rows which are incomplete.
+            """
+            sql = """
+                    DELETE FROM %s_current
+                    WHERE completed_delta_stream_id IS NULL
+            """
+
+            for _k, (table, id_col) in TYPE_TO_TABLE.items():
+                txn.execute(sql % (table,))
+
+        # first wedge the incremental processor and reset our promise
+        yield self.stats_delta_processing_lock.acquire()
+        try:
+            old_positions = yield self.runInteraction(
+                "populate_stats_wedge", _wedge_incremental_processor
+            )
+        finally:
+            yield self.stats_delta_processing_lock.release()
+
+        if None in old_positions.values():
+            old_positions = None
+
+        # with the incremental processor wedged, we delete dirty skeleton rows
+        # since we don't want to double-count them.
+        yield self.runInteraction(
+            "populate_stats_delete_dirty_skeletons", _delete_dirty_skeletons
+        )
+
+        yield self._unwedge_incremental_processor(old_positions)
+
+        yield self.runInteraction(
+            "populate_stats_make_skeletons", _make_skeletons, "room"
+        )
+        yield self.runInteraction(
+            "populate_stats_make_skeletons", _make_skeletons, "user"
+        )
+        self.get_earliest_token_for_stats.invalidate_all()
+
+        yield self._end_background_update("populate_stats_prepare")
+        return 1
+
+    @defer.inlineCallbacks
+    def _populate_stats_process_users(self, progress, batch_size):
+        """
+        This is a background update which regenerates statistics for users.
+        """
+        if not self.stats_enabled:
+            yield self._end_background_update("populate_stats_process_users")
+            return 1
+
+        def _get_next_batch(txn):
+            # Only fetch 250 users, so we don't fetch too many at once, even
+            # if those 250 users have less than batch_size state events.
+            sql = """
+                    SELECT user_id FROM user_stats_current
+                    WHERE completed_delta_stream_id IS NULL
+                    LIMIT 250
+                """
+            txn.execute(sql)
+            users_to_work_on = txn.fetchall()
+
+            if not users_to_work_on:
+                return None
+
+            # Get how many are left to process, so we can give status on how
+            # far we are in processing
+            sql = """
+                    SELECT COUNT(*) FROM user_stats_current
+                    WHERE completed_delta_stream_id IS NULL
+                """
+            txn.execute(sql)
+            progress["remaining"] = txn.fetchone()[0]
+
+            return users_to_work_on
+
+        users_to_work_on = yield self.runInteraction(
+            "populate_stats_users_get_batch", _get_next_batch
+        )
+
+        # No more users -- complete the transaction.
+        if not users_to_work_on:
+            yield self._end_background_update("populate_stats_process_users")
+            return 1
+
+        logger.info(
+            "Processing the next %d users of %d remaining",
+            len(users_to_work_on),
+            progress["remaining"],
+        )
+
+        processed_membership_count = 0
+
+        promised_positions = yield self.get_stats_positions(for_initial_processor=True)
+
+        if None in promised_positions:
+            logger.error(
+                "There is a None in promised_positions;"
+                " dependency task must not have been run."
+                " promised_positions: %r",
+                promised_positions,
+            )
+            yield self._end_background_update("populate_stats_process_users")
+            return 1
+
+        for (user_id,) in users_to_work_on:
+            now = self.clock.time_msec()
+
+            def _process_user(txn):
+                # Get the current token
+                current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
+
+                sql = """
+                        SELECT
+                            (
+                                join_rules = 'public'
+                                OR history_visibility = 'world_readable'
+                            ) AS is_public,
+                            COUNT(*) AS count
+                        FROM room_memberships
+                        JOIN room_stats_state USING (room_id)
+                        WHERE
+                            user_id = ? AND membership = 'join'
+                        GROUP BY is_public
+                    """
+                txn.execute(sql, (user_id,))
+                room_counts_by_publicness = dict(txn.fetchall())
+
+                self._update_stats_delta_txn(
+                    txn,
+                    now,
+                    "user",
+                    user_id,
+                    {},
+                    complete_with_stream_id=current_token,
+                    absolute_field_overrides={
+                        # these are counted absolutely because it is
+                        # more difficult to count them from the promised time,
+                        # because counting them now can use the quick lookup
+                        # tables.
+                        "public_rooms": room_counts_by_publicness.get(True, 0),
+                        "private_rooms": room_counts_by_publicness.get(False, 0),
+                    },
+                )
+
+                # we use this count for rate-limiting
+                return sum(room_counts_by_publicness.values())
+
+            processed_membership_count += yield self.runInteraction(
+                "update_user_stats", _process_user
+            )
+
+            # Update the remaining counter.
+            progress["remaining"] -= 1
+
+            if processed_membership_count > batch_size:
+                # Don't process any more users, we've hit our batch size.
+                return processed_membership_count
+
+        yield self.runInteraction(
+            "populate_stats",
+            self._background_update_progress_txn,
+            "populate_stats_process_users",
+            progress,
+        )
+
+        return processed_membership_count
+
+    @defer.inlineCallbacks
+    def _populate_stats_process_rooms(self, progress, batch_size):
+        """
+        This is a background update which regenerates statistics for rooms.
+        """
+        if not self.stats_enabled:
+            yield self._end_background_update("populate_stats_process_rooms")
+            return 1
+
+        def _get_next_batch(txn):
+            # Only fetch 250 rooms, so we don't fetch too many at once, even
+            # if those 250 rooms have less than batch_size state events.
+            sql = """
+                    SELECT room_id FROM room_stats_current
+                    WHERE completed_delta_stream_id IS NULL
+                    LIMIT 250
+                """
+            txn.execute(sql)
+            rooms_to_work_on = txn.fetchall()
+
+            if not rooms_to_work_on:
+                return None
+
+            # Get how many are left to process, so we can give status on how
+            # far we are in processing
+            sql = """
+                    SELECT COUNT(*) FROM room_stats_current
+                    WHERE completed_delta_stream_id IS NULL
+                """
+            txn.execute(sql)
+            progress["remaining"] = txn.fetchone()[0]
+
+            return rooms_to_work_on
+
+        rooms_to_work_on = yield self.runInteraction(
+            "populate_stats_rooms_get_batch", _get_next_batch
+        )
+
+        # No more rooms -- complete the transaction.
+        if not rooms_to_work_on:
+            yield self._end_background_update("populate_stats_process_rooms")
+            return 1
+
+        logger.info(
+            "Processing the next %d rooms of %d remaining",
+            len(rooms_to_work_on),
+            progress["remaining"],
+        )
+
+        # Number of state events we've processed by going through each room
+        processed_event_count = 0
+
+        promised_positions = yield self.get_stats_positions(for_initial_processor=True)
+
+        if None in promised_positions:
+            logger.error(
+                "There is a None in promised_positions;"
+                " dependency task must not have been run."
+                " promised_positions: %s",
+                promised_positions,
+            )
+            yield self._end_background_update("populate_stats_process_rooms")
+            return 1
+
+        for (room_id,) in rooms_to_work_on:
+            current_state_ids = yield self.get_current_state_ids(room_id)
+
+            join_rules_id = current_state_ids.get((EventTypes.JoinRules, ""))
+            history_visibility_id = current_state_ids.get(
+                (EventTypes.RoomHistoryVisibility, "")
+            )
+            encryption_id = current_state_ids.get((EventTypes.RoomEncryption, ""))
+            name_id = current_state_ids.get((EventTypes.Name, ""))
+            topic_id = current_state_ids.get((EventTypes.Topic, ""))
+            avatar_id = current_state_ids.get((EventTypes.RoomAvatar, ""))
+            canonical_alias_id = current_state_ids.get((EventTypes.CanonicalAlias, ""))
+
+            event_ids = [
+                join_rules_id,
+                history_visibility_id,
+                encryption_id,
+                name_id,
+                topic_id,
+                avatar_id,
+                canonical_alias_id,
+            ]
+
+            state_events = yield self.get_events(
+                [ev for ev in event_ids if ev is not None]
+            )
+
+            def _get_or_none(event_id, arg):
+                event = state_events.get(event_id)
+                if event:
+                    return event.content.get(arg)
+                return None
+
+            yield self.update_room_state(
+                room_id,
+                {
+                    "join_rules": _get_or_none(join_rules_id, "join_rule"),
+                    "history_visibility": _get_or_none(
+                        history_visibility_id, "history_visibility"
+                    ),
+                    "encryption": _get_or_none(encryption_id, "algorithm"),
+                    "name": _get_or_none(name_id, "name"),
+                    "topic": _get_or_none(topic_id, "topic"),
+                    "avatar": _get_or_none(avatar_id, "url"),
+                    "canonical_alias": _get_or_none(canonical_alias_id, "alias"),
+                },
+            )
+
+            now = self.clock.time_msec()
+
+            def _fetch_data(txn):
+                # Get the current token of the room
+                current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
+
+                current_state_events = len(current_state_ids)
+
+                membership_counts = self._get_user_counts_in_room_txn(txn, room_id)
+
+                room_total_event_count, room_total_event_bytes = self._count_events_and_bytes_in_room_txn(
+                    txn,
+                    room_id,
+                    promised_positions["total_events_min_stream_ordering"],
+                    promised_positions["total_events_max_stream_ordering"],
+                )
+
+                self._update_stats_delta_txn(
+                    txn,
+                    now,
+                    "room",
+                    room_id,
+                    {
+                        "total_events": room_total_event_count,
+                        "total_event_bytes": room_total_event_bytes,
+                    },
+                    complete_with_stream_id=current_token,
+                    absolute_field_overrides={
+                        # these are counted absolutely because it is
+                        # more difficult to count them from the promised time,
+                        # because counting them now can use the quick lookup
+                        # tables.
+                        "current_state_events": current_state_events,
+                        "joined_members": membership_counts.get(Membership.JOIN, 0),
+                        "invited_members": membership_counts.get(Membership.INVITE, 0),
+                        "left_members": membership_counts.get(Membership.LEAVE, 0),
+                        "banned_members": membership_counts.get(Membership.BAN, 0),
+                    },
+                )
+
+                # we use this count for rate-limiting
+                return room_total_event_count
+
+            room_event_count = yield self.runInteraction(
+                "update_room_stats", _fetch_data
+            )
+
+            # Update the remaining counter.
+            progress["remaining"] -= 1
+
+            processed_event_count += room_event_count
+
+            if processed_event_count > batch_size:
+                # Don't process any more rooms, we've hit our batch size.
+                return processed_event_count
+
+        yield self.runInteraction(
+            "populate_stats",
+            self._background_update_progress_txn,
+            "populate_stats_process_rooms",
+            progress,
+        )
+
+        return processed_event_count
+
     def get_stats_positions(self, for_initial_processor=False):
         """
         Returns the stats processor positions.
@@ -581,7 +1047,7 @@ class StatsStore(StateDeltasStore):
             # nothing to do here.
             return
 
-        now = self.hs.clock.time_msec()
+        now = self.clock.time_msec()
 
         # we choose comparators based on the signs
         low_comparator = "<=" if low_pos < 0 else "<"
@@ -613,3 +1079,38 @@ class StatsStore(StateDeltasStore):
                 room_id,
                 {"total_events": new_events, "total_event_bytes": new_bytes},
             )
+
+    def _count_events_and_bytes_in_room_txn(self, txn, room_id, low_token, high_token):
+        """
+        Count the number of events and event bytes in a room between two tokens,
+        inclusive.
+        Args:
+            txn (cursor): The database
+            room_id (str): The ID of the room to count events for
+            low_token (int): the minimum stream ordering to count
+            high_token (int): the maximum stream ordering to count
+
+        Returns (tuple[int, int]):
+            First element (int):
+                the number of events
+            Second element (int):
+                the number of bytes in events' event JSON
+        """
+
+        if isinstance(self.database_engine, PostgresEngine):
+            bytes_expression = "OCTET_LENGTH(json)"
+        else:
+            bytes_expression = "LENGTH(CAST(json AS BLOB))"
+
+        sql = """
+            SELECT COUNT(*) AS num_events, SUM(%s) AS num_bytes
+            FROM events
+            JOIN event_json USING (event_id)
+            WHERE events.room_id = ?
+                AND ? <= stream_ordering
+                AND stream_ordering <= ?
+        """ % (
+            bytes_expression,
+        )
+        txn.execute(sql, (room_id, low_token, high_token))
+        return txn.fetchone()