diff options
author | Olivier Wilkinson (reivilibre) <olivier@librepush.net> | 2019-08-20 14:36:14 +0100 |
---|---|---|
committer | Olivier Wilkinson (reivilibre) <olivier@librepush.net> | 2019-08-20 14:36:14 +0100 |
commit | 279e63aea201d07dace0cd7d817d89350597b6b2 (patch) | |
tree | 7efb660029cbe8e010a915a75bd9d45ca42b4672 /synapse/storage | |
parent | Handle state deltas and turn them into stats deltas (diff) | |
download | synapse-279e63aea201d07dace0cd7d817d89350597b6b2.tar.xz |
Collect old current stats rows when updating stats with deltas
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/stats.py | 81 |
1 files changed, 79 insertions, 2 deletions
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 4112291c76..c2f11b84bd 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -213,6 +213,84 @@ class StatsStore(StateDeltasStore): allow_none=True, ) + def _collect_old_txn(self, txn, stats_type, limit=500): + """ + See {collect_old}. Runs only a small batch, specified by limit. + + Returns (bool): + True iff there is possibly more to do (i.e. this needs re-running), + False otherwise. + + """ + # we do them in batches to prevent concurrent updates from + # messing us over with lots of retries + + now = self.hs.get_reactor().seconds() + quantised_ts = self.quantise_stats_time(now) + table, id_col = TYPE_TO_TABLE[stats_type] + + fields = ", ".join( + field + for field in chain( + ABSOLUTE_STATS_FIELDS[stats_type], PER_SLICE_FIELDS[stats_type] + ) + ) + + # `end_ts IS NOT NULL` is for partial index optimisation + if isinstance(self.database_engine, Sqlite3Engine): + # SQLite doesn't support SELECT FOR UPDATE + sql = ( + "SELECT %s FROM %s_current" + " WHERE end_ts <= ? AND end_ts IS NOT NULL" + " LIMIT %d" + ) % (id_col, table, limit) + else: + sql = ( + "SELECT %s FROM %s_current" + " WHERE end_ts <= ? AND end_ts IS NOT NULL" + " LIMIT %d FOR UPDATE" + ) % (id_col, table, limit) + txn.execute(sql, (quantised_ts,)) + maybe_more = txn.rowcount == limit + updates = txn.fetchall() + + sql = ( + "INSERT INTO %s_historical (%s, %s, bucket_size, end_ts)" + " SELECT %s, %s, end_ts - start_ts AS bucket_size, end_ts" + " FROM %s_current WHERE %s = ?" + ) % (table, id_col, fields, id_col, fields, table, id_col) + txn.executemany(sql, updates) + + sql = ("UPDATE %s_current SET start_ts = NULL, end_ts = NULL WHERE %s = ?") % ( + table, + id_col, + ) + txn.executemany(sql, updates) + + return maybe_more + + @defer.inlineCallbacks + def collect_old(self, stats_type): + """ + Run 'old collection' on current stats rows. + + Old collection is the process of copying dirty (updated) stats rows + from the current table to the historical table, when those rows have + finished their stats time slice. + Collected rows are then cleared of their dirty status. + + Args: + stats_type: "room" or "user" – the type of stats to run old collection + on. + + """ + while True: + maybe_more = yield self.runInteraction( + "stats_collect_old", self._collect_old_txn, stats_type + ) + if not maybe_more: + return None + @defer.inlineCallbacks def update_stats_delta( self, ts, stats_type, stats_id, fields, complete_with_stream_id=None @@ -246,8 +324,7 @@ class StatsStore(StateDeltasStore): return res except OldCollectionRequired: # retry after collecting old rows - # TODO (implement later) - raise NotImplementedError("old collection not in this PR") + yield self.collect_old(stats_type) def _update_stats_delta_txn( self, |