summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorOlivier Wilkinson (reivilibre) <olivier@librepush.net>2019-08-20 14:36:14 +0100
committerOlivier Wilkinson (reivilibre) <olivier@librepush.net>2019-08-20 14:36:14 +0100
commit279e63aea201d07dace0cd7d817d89350597b6b2 (patch)
tree7efb660029cbe8e010a915a75bd9d45ca42b4672 /synapse/storage
parentHandle state deltas and turn them into stats deltas (diff)
downloadsynapse-279e63aea201d07dace0cd7d817d89350597b6b2.tar.xz
Collect old current stats rows when updating stats with deltas
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/stats.py81
1 files changed, 79 insertions, 2 deletions
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index 4112291c76..c2f11b84bd 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -213,6 +213,84 @@ class StatsStore(StateDeltasStore):
             allow_none=True,
         )
 
+    def _collect_old_txn(self, txn, stats_type, limit=500):
+        """
+        See {collect_old}. Runs only a small batch, specified by limit.
+
+        Returns (bool):
+            True iff there is possibly more to do (i.e. this needs re-running),
+            False otherwise.
+
+        """
+        # we do them in batches to prevent concurrent updates from
+        # messing us over with lots of retries
+
+        now = self.hs.get_reactor().seconds()
+        quantised_ts = self.quantise_stats_time(now)
+        table, id_col = TYPE_TO_TABLE[stats_type]
+
+        fields = ", ".join(
+            field
+            for field in chain(
+                ABSOLUTE_STATS_FIELDS[stats_type], PER_SLICE_FIELDS[stats_type]
+            )
+        )
+
+        # `end_ts IS NOT NULL` is for partial index optimisation
+        if isinstance(self.database_engine, Sqlite3Engine):
+            # SQLite doesn't support SELECT FOR UPDATE
+            sql = (
+                "SELECT %s FROM %s_current"
+                " WHERE end_ts <= ? AND end_ts IS NOT NULL"
+                " LIMIT %d"
+            ) % (id_col, table, limit)
+        else:
+            sql = (
+                "SELECT %s FROM %s_current"
+                " WHERE end_ts <= ? AND end_ts IS NOT NULL"
+                " LIMIT %d FOR UPDATE"
+            ) % (id_col, table, limit)
+        txn.execute(sql, (quantised_ts,))
+        maybe_more = txn.rowcount == limit
+        updates = txn.fetchall()
+
+        sql = (
+            "INSERT INTO %s_historical (%s, %s, bucket_size, end_ts)"
+            " SELECT %s, %s, end_ts - start_ts AS bucket_size, end_ts"
+            " FROM %s_current WHERE %s = ?"
+        ) % (table, id_col, fields, id_col, fields, table, id_col)
+        txn.executemany(sql, updates)
+
+        sql = ("UPDATE %s_current SET start_ts = NULL, end_ts = NULL WHERE %s = ?") % (
+            table,
+            id_col,
+        )
+        txn.executemany(sql, updates)
+
+        return maybe_more
+
+    @defer.inlineCallbacks
+    def collect_old(self, stats_type):
+        """
+        Run 'old collection' on current stats rows.
+
+        Old collection is the process of copying dirty (updated) stats rows
+        from the current table to the historical table, when those rows have
+        finished their stats time slice.
+        Collected rows are then cleared of their dirty status.
+
+        Args:
+            stats_type: "room" or "user" – the type of stats to run old collection
+                on.
+
+        """
+        while True:
+            maybe_more = yield self.runInteraction(
+                "stats_collect_old", self._collect_old_txn, stats_type
+            )
+            if not maybe_more:
+                return None
+
     @defer.inlineCallbacks
     def update_stats_delta(
         self, ts, stats_type, stats_id, fields, complete_with_stream_id=None
@@ -246,8 +324,7 @@ class StatsStore(StateDeltasStore):
                 return res
             except OldCollectionRequired:
                 # retry after collecting old rows
-                # TODO (implement later)
-                raise NotImplementedError("old collection not in this PR")
+                yield self.collect_old(stats_type)
 
     def _update_stats_delta_txn(
         self,