diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index 4112291c76..c2f11b84bd 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -213,6 +213,84 @@ class StatsStore(StateDeltasStore):
allow_none=True,
)
+ def _collect_old_txn(self, txn, stats_type, limit=500):
+ """
+ See {collect_old}. Runs only a small batch, specified by limit.
+
+ Returns (bool):
+ True iff there is possibly more to do (i.e. this needs re-running),
+ False otherwise.
+
+ """
+ # we do them in batches to prevent concurrent updates from
+ # messing us over with lots of retries
+
+ now = self.hs.get_reactor().seconds()
+ quantised_ts = self.quantise_stats_time(now)
+ table, id_col = TYPE_TO_TABLE[stats_type]
+
+ fields = ", ".join(
+ field
+ for field in chain(
+ ABSOLUTE_STATS_FIELDS[stats_type], PER_SLICE_FIELDS[stats_type]
+ )
+ )
+
+ # `end_ts IS NOT NULL` is for partial index optimisation
+ if isinstance(self.database_engine, Sqlite3Engine):
+ # SQLite doesn't support SELECT FOR UPDATE
+ sql = (
+ "SELECT %s FROM %s_current"
+ " WHERE end_ts <= ? AND end_ts IS NOT NULL"
+ " LIMIT %d"
+ ) % (id_col, table, limit)
+ else:
+ sql = (
+ "SELECT %s FROM %s_current"
+ " WHERE end_ts <= ? AND end_ts IS NOT NULL"
+ " LIMIT %d FOR UPDATE"
+ ) % (id_col, table, limit)
+ txn.execute(sql, (quantised_ts,))
+ maybe_more = txn.rowcount == limit
+ updates = txn.fetchall()
+
+ sql = (
+ "INSERT INTO %s_historical (%s, %s, bucket_size, end_ts)"
+ " SELECT %s, %s, end_ts - start_ts AS bucket_size, end_ts"
+ " FROM %s_current WHERE %s = ?"
+ ) % (table, id_col, fields, id_col, fields, table, id_col)
+ txn.executemany(sql, updates)
+
+ sql = ("UPDATE %s_current SET start_ts = NULL, end_ts = NULL WHERE %s = ?") % (
+ table,
+ id_col,
+ )
+ txn.executemany(sql, updates)
+
+ return maybe_more
+
+ @defer.inlineCallbacks
+ def collect_old(self, stats_type):
+ """
+ Run 'old collection' on current stats rows.
+
+ Old collection is the process of copying dirty (updated) stats rows
+ from the current table to the historical table, when those rows have
+ finished their stats time slice.
+ Collected rows are then cleared of their dirty status.
+
+ Args:
+ stats_type: "room" or "user" – the type of stats to run old collection
+ on.
+
+ """
+ while True:
+ maybe_more = yield self.runInteraction(
+ "stats_collect_old", self._collect_old_txn, stats_type
+ )
+ if not maybe_more:
+ return None
+
@defer.inlineCallbacks
def update_stats_delta(
self, ts, stats_type, stats_id, fields, complete_with_stream_id=None
@@ -246,8 +324,7 @@ class StatsStore(StateDeltasStore):
return res
except OldCollectionRequired:
# retry after collecting old rows
- # TODO (implement later)
- raise NotImplementedError("old collection not in this PR")
+ yield self.collect_old(stats_type)
def _update_stats_delta_txn(
self,
|