diff options
author | Olivier Wilkinson (reivilibre) <olivier@librepush.net> | 2019-08-20 14:08:02 +0100 |
---|---|---|
committer | Olivier Wilkinson (reivilibre) <olivier@librepush.net> | 2019-08-20 14:08:02 +0100 |
commit | 80a1c6e9e5e4fbaa58355559e42a9a1bbc91c81f (patch) | |
tree | 87e3c28d9a63fd0065a8b9f614a7c0a0f6283059 /synapse/storage | |
parent | Add schema for Separated Statistics (diff) | |
download | synapse-80a1c6e9e5e4fbaa58355559e42a9a1bbc91c81f.tar.xz |
Add storage function for storing stats deltas
Old collection is not included in this commit Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/stats.py | 171 |
1 files changed, 169 insertions, 2 deletions
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 123c1ae220..e8b1ce240b 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2018, 2019 New Vector Ltd +# Copyright 2019 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,6 +17,7 @@ import logging from synapse.storage.state_deltas import StateDeltasStore +from twisted.internet import defer logger = logging.getLogger(__name__) @@ -27,12 +29,21 @@ ABSOLUTE_STATS_FIELDS = { "invited_members", "left_members", "banned_members", - "state_events", + "total_events", ), "user": ("public_rooms", "private_rooms"), } -TYPE_TO_ROOM = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")} +# these fields are per-timeslice and so should be reset to 0 upon a new slice +PER_SLICE_FIELDS = {"room": (), "user": ()} + +TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")} + + +class OldCollectionRequired(Exception): + """ Signal that we need to collect old stats rows and retry. """ + + pass class StatsStore(StateDeltasStore): @@ -48,6 +59,21 @@ class StatsStore(StateDeltasStore): self.register_noop_background_update("populate_stats_process_rooms") self.register_noop_background_update("populate_stats_cleanup") + def quantise_stats_time(self, ts): + """ + Quantises a timestamp to be a multiple of the bucket size. + + Args: + ts: the timestamp to quantise, in seconds since the Unix Epoch + + Returns: + a timestamp which + - is divisible by the bucket size; + - is no later than `ts`; and + - is the largest such timestamp. + """ + return (ts // self.stats_bucket_size) * self.stats_bucket_size + def update_room_state(self, room_id, fields): """ Args: @@ -76,3 +102,144 @@ class StatsStore(StateDeltasStore): values=fields, desc="update_room_state", ) + + @defer.inlineCallbacks + def update_stats_delta( + self, ts, stats_type, stats_id, fields, complete_with_stream_id=None + ): + """ + Updates the statistics for a subject, with a delta (difference/relative + change). + + Args: + ts (int): timestamp of the change + stats_type (str): "room" or "user" – the kind of subject + stats_id (str): the subject's ID (room ID or user ID) + fields (dict[str, int]): Deltas of stats values. + complete_with_stream_id (int, optional): + If supplied, converts an incomplete row into a complete row, + with the supplied stream_id marked as the stream_id where the + row was completed. + """ + + while True: + try: + res = yield self.runInteraction( + "update_stats_delta", + self._update_stats_delta_txn, + ts, + stats_type, + stats_id, + fields, + complete_with_stream_id=complete_with_stream_id, + ) + return res + except OldCollectionRequired: + # retry after collecting old rows + # TODO (implement later) + raise NotImplementedError("old collection not in this PR") + + def _update_stats_delta_txn( + self, + txn, + ts, + stats_type, + stats_id, + fields, + complete_with_stream_id=None, + absolute_fields=None, + ): + """ + See L{update_stats_delta} + Additional Args: + absolute_fields (dict[str, int]): Absolute stats values (i.e. not deltas). + """ + table, id_col = TYPE_TO_TABLE[stats_type] + + quantised_ts = self.quantise_stats_time(int(ts)) + end_ts = quantised_ts + self.stats_bucket_size + + field_sqls = ["%s = %s + ?" % (field, field) for field in fields.keys()] + field_values = list(fields.values()) + + if absolute_fields is not None: + field_sqls += ["%s = ?" % (field,) for field in absolute_fields.keys()] + field_values += list(absolute_fields.values()) + + if complete_with_stream_id is not None: + field_sqls.append("completed_delta_stream_id = ?") + field_values.append(complete_with_stream_id) + + sql = ( + "UPDATE %s_current SET end_ts = ?, %s" + " WHERE (end_ts IS NOT NULL AND (end_ts >= ? OR completed_delta_stream_id IS NULL))" + " AND %s = ?" + ) % (table, ", ".join(field_sqls), id_col) + + qargs = [end_ts] + list(field_values) + [end_ts, stats_id] + + txn.execute(sql, qargs) + + if txn.rowcount > 0: + # success. + return + + # if we're here, it's because we didn't succeed in updating a stats + # row. Why? Let's find out… + + current_row = self._simple_select_one_txn( + txn, + table + "_current", + {id_col: stats_id}, + ("end_ts", "completed_delta_stream_id"), + allow_none=True, + ) + + if current_row is None: + # we need to insert a row! (insert a dirty, incomplete row) + insertee = { + id_col: stats_id, + "end_ts": end_ts, + "start_ts": ts, + "completed_delta_stream_id": complete_with_stream_id, + } + + # we assume that, by default, blank fields should be zero. + for field_name in ABSOLUTE_STATS_FIELDS[stats_type]: + insertee[field_name] = 0 + + for field_name in PER_SLICE_FIELDS[stats_type]: + insertee[field_name] = 0 + + for (field, value) in fields.items(): + insertee[field] = value + + if absolute_fields is not None: + for (field, value) in absolute_fields.items(): + insertee[field] = value + + self._simple_insert_txn(txn, table + "_current", insertee) + + elif current_row["end_ts"] is None: + # update the row, including start_ts + sql = ( + "UPDATE %s_current SET start_ts = ?, end_ts = ?, %s" + " WHERE end_ts IS NULL AND %s = ?" + ) % (table, ", ".join(field_sqls), id_col) + + qargs = ( + [end_ts - self.stats_bucket_size, end_ts] + + list(field_values) + + [stats_id] + ) + + txn.execute(sql, qargs) + if txn.rowcount == 0: + raise RuntimeError( + "Should be impossible: No rows updated" + " but all conditions are known to be met." + ) + + elif current_row["end_ts"] < end_ts: + # we need to perform old collection first + raise OldCollectionRequired() |