summary refs log tree commit diff
path: root/synapse/storage/stats.py
diff options
context:
space:
mode:
authorOlivier Wilkinson (reivilibre) <olivier@librepush.net>2019-08-27 13:54:38 +0100
committerOlivier Wilkinson (reivilibre) <olivier@librepush.net>2019-08-27 13:54:38 +0100
commitbc754cdeed38be79fac33e84fc2a78307d799676 (patch)
treeaf78f1d2dd225c1fb376684d4efed2e306c0d269 /synapse/storage/stats.py
parentHandle state deltas and turn them into stats deltas (diff)
parentDon't include the room & user stats docs in this PR. (diff)
downloadsynapse-bc754cdeed38be79fac33e84fc2a78307d799676.tar.xz
Merge branch 'rei/rss_inc2' into rei/rss_inc3
Diffstat (limited to 'synapse/storage/stats.py')
-rw-r--r--synapse/storage/stats.py313
1 files changed, 214 insertions, 99 deletions
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index 4112291c76..6832ec6b7f 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -18,8 +18,10 @@ import logging
 from threading import Lock
 
 from twisted.internet import defer
+from itertools import chain
 
 from synapse.storage.state_deltas import StateDeltasStore
+from synapse.storage.state_deltas import StateDeltasStore
 from synapse.util.caches.descriptors import cached
 
 logger = logging.getLogger(__name__)
@@ -69,10 +71,11 @@ class StatsStore(StateDeltasStore):
         Quantises a timestamp to be a multiple of the bucket size.
 
         Args:
-            ts: the timestamp to quantise, in seconds since the Unix Epoch
+            ts (int): the timestamp to quantise, in milliseconds since the Unix
+                Epoch
 
         Returns:
-            a timestamp which
+            int: a timestamp which
               - is divisible by the bucket size;
               - is no later than `ts`; and
               - is the largest such timestamp.
@@ -213,7 +216,6 @@ class StatsStore(StateDeltasStore):
             allow_none=True,
         )
 
-    @defer.inlineCallbacks
     def update_stats_delta(
         self, ts, stats_type, stats_id, fields, complete_with_stream_id=None
     ):
@@ -232,22 +234,162 @@ class StatsStore(StateDeltasStore):
                 row was completed.
         """
 
-        while True:
-            try:
-                res = yield self.runInteraction(
-                    "update_stats_delta",
-                    self._update_stats_delta_txn,
-                    ts,
-                    stats_type,
-                    stats_id,
-                    fields,
-                    complete_with_stream_id=complete_with_stream_id,
-                )
-                return res
-            except OldCollectionRequired:
-                # retry after collecting old rows
-                # TODO (implement later)
-                raise NotImplementedError("old collection not in this PR")
+        return self.runInteraction(
+            "update_stats_delta",
+            self._update_stats_delta_txn,
+            ts,
+            stats_type,
+            stats_id,
+            fields,
+            complete_with_stream_id=complete_with_stream_id,
+        )
+
+    def _upsert_with_additive_relatives_txn(
+        self, txn, table, keyvalues, absolutes, additive_relatives
+    ):
+        """Used to update values in the stats tables.
+
+        Args:
+            txn: Transaction
+            table (str): Table name
+            keyvalues (dict[str, any]): Row-identifying key values
+            absolutes (dict[str, any]): Absolute (set) fields
+            additive_relatives (dict[str, int]): Fields that will be added onto
+                if existing row present.
+        """
+        if self.database_engine.can_native_upsert:
+            absolute_updates = [
+                "%(field)s = EXCLUDED.%(field)s" % {"field": field}
+                for field in absolutes.keys()
+            ]
+
+            relative_updates = [
+                "%(field)s = EXCLUDED.%(field)s + %(table)s.%(field)s"
+                % {"table": table, "field": field}
+                for field in additive_relatives.keys()
+            ]
+
+            insert_cols = []
+            qargs = [table]
+
+            for (key, val) in chain(
+                keyvalues.items(), absolutes.items(), additive_relatives.items()
+            ):
+                insert_cols.append(key)
+                qargs.append(val)
+
+            sql = """
+                INSERT INTO %(table)s (%(insert_cols_cs)s)
+                VALUES (%(insert_vals_qs)s)
+                ON CONFLICT DO UPDATE SET %(updates)s
+            """ % {
+                "table": table,
+                "insert_cols_cs": ", ".join(insert_cols),
+                "insert_vals_qs": ", ".join(
+                    ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
+                ),
+                "updates": ", ".join(chain(absolute_updates, relative_updates)),
+            }
+
+            txn.execute(sql, qargs)
+        else:
+            self.database_engine.lock_table(txn, table)
+            retcols = chain(absolutes.keys(), additive_relatives.keys())
+            current_row = self._simple_select_one_txn(
+                txn, table, keyvalues, retcols, allow_none=True
+            )
+            if current_row is None:
+                merged_dict = {**keyvalues, **absolutes, **additive_relatives}
+                self._simple_insert_txn(txn, table, merged_dict)
+            else:
+                for (key, val) in additive_relatives.items():
+                    current_row[key] += val
+                current_row.update(absolutes)
+                self._simple_update_one_txn(txn, table, keyvalues, current_row)
+
+    def _upsert_copy_from_table_with_additive_relatives_txn(
+        self,
+        txn,
+        into_table,
+        keyvalues,
+        extra_dst_keyvalues,
+        additive_relatives,
+        src_table,
+        copy_columns,
+        additional_where="",
+    ):
+        """
+        Args:
+             txn: Transaction
+             into_table (str): The destination table to UPSERT the row into
+             keyvalues (dict[str, any]): Row-identifying key values
+             extra_dst_keyvalues (dict[str, any]): Additional keyvalues
+                for `into_table`.
+             additive_relatives (dict[str, any]): Fields that will be added onto
+                if existing row present. (Must be disjoint from copy_columns.)
+             src_table (str): The source table to copy from
+             copy_columns (iterable[str]): The list of columns to copy
+             additional_where (str): Additional SQL for where (prefix with AND
+                if using).
+        """
+        if self.database_engine.can_native_upsert:
+            ins_columns = chain(
+                keyvalues, copy_columns, additive_relatives, extra_dst_keyvalues
+            )
+            sel_exprs = chain(
+                keyvalues,
+                copy_columns,
+                ("?" for _ in chain(additive_relatives, extra_dst_keyvalues)),
+            )
+            keyvalues_where = ("%s = ?" % f for f in keyvalues)
+
+            sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns)
+            sets_ar = (
+                "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f) for f in copy_columns
+            )
+
+            sql = """
+                INSERT INTO %(into_table)s (%(ins_columns)s)
+                SELECT %(sel_exprs)s
+                FROM %(src_table)s
+                WHERE %(keyvalues_where)s %(additional_where)s
+                ON CONFLICT (%(keyvalues)s)
+                DO UPDATE SET %(sets)s
+            """ % {
+                "into_table": into_table,
+                "ins_columns": ", ".join(ins_columns),
+                "sel_exprs": ", ".join(sel_exprs),
+                "keyvalues_where": " AND ".join(keyvalues_where),
+                "src_table": src_table,
+                "keyvalues": ", ".join(
+                    chain(keyvalues.keys(), extra_dst_keyvalues.keys())
+                ),
+                "sets": ", ".join(chain(sets_cc, sets_ar)),
+                "additional_where": additional_where,
+            }
+
+            qargs = chain(additive_relatives.values(), keyvalues.values())
+            txn.execute(sql, qargs)
+        else:
+            self.database_engine.lock_table(txn, into_table)
+            src_row = self._simple_select_one_txn(
+                txn, src_table, keyvalues, copy_columns
+            )
+            dest_current_row = self._simple_select_one_txn(
+                txn,
+                into_table,
+                keyvalues,
+                chain(additive_relatives.keys(), copy_columns),
+                allow_none=True,
+            )
+
+            if dest_current_row is None:
+                merged_dict = {**keyvalues, **src_row, **additive_relatives}
+                self._simple_insert_txn(txn, into_table, merged_dict)
+            else:
+                for (key, val) in additive_relatives.items():
+                    src_row[key] = dest_current_row[key] + val
+                self._simple_update_txn(txn, into_table, keyvalues, src_row)
 
     def _update_stats_delta_txn(
         self,
@@ -262,94 +404,67 @@ class StatsStore(StateDeltasStore):
         """
         See L{update_stats_delta}
         Additional Args:
-            absolute_fields (dict[str, int]): Absolute stats values (i.e. not deltas).
+            absolute_fields (dict[str, int]): Absolute current stats values
+                (i.e. not deltas). Does not work with per-slice fields.
         """
         table, id_col = TYPE_TO_TABLE[stats_type]
 
         quantised_ts = self.quantise_stats_time(int(ts))
         end_ts = quantised_ts + self.stats_bucket_size
 
-        field_sqls = ["%s = %s + ?" % (field, field) for field in fields.keys()]
-        field_values = list(fields.values())
-
-        if absolute_fields is not None:
-            field_sqls += ["%s = ?" % (field,) for field in absolute_fields.keys()]
-            field_values += list(absolute_fields.values())
-
-        if complete_with_stream_id is not None:
-            field_sqls.append("completed_delta_stream_id = ?")
-            field_values.append(complete_with_stream_id)
-
-        sql = (
-            "UPDATE %s_current SET end_ts = ?, %s"
-            " WHERE (end_ts IS NOT NULL AND (end_ts >= ? OR completed_delta_stream_id IS NULL))"
-            " AND %s = ?"
-        ) % (table, ", ".join(field_sqls), id_col)
-
-        qargs = [end_ts] + list(field_values) + [end_ts, stats_id]
-
-        txn.execute(sql, qargs)
-
-        if txn.rowcount > 0:
-            # success.
-            return
-
-        # if we're here, it's because we didn't succeed in updating a stats
-        # row. Why? Let's find out…
+        abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type]
+        slice_field_names = PER_SLICE_FIELDS[stats_type]
+        for field in chain(fields.keys(), absolute_fields.keys()):
+            if field not in abs_field_names and field not in slice_field_names:
+                # guard against potential SQL injection dodginess
+                raise ValueError(
+                    "%s is not a recognised field"
+                    " for stats type %s" % (field, stats_type)
+                )
 
-        current_row = self._simple_select_one_txn(
-            txn,
-            table + "_current",
-            {id_col: stats_id},
-            ("end_ts", "completed_delta_stream_id"),
-            allow_none=True,
+        # only absolute stats fields are tracked in the `_current` stats tables,
+        # so those are the only ones that we process deltas for when
+        # we upsert against the `_current` table.
+        additive_relatives = {
+            key: fields.get(key, 0)
+            for key in abs_field_names
+            if key not in absolute_fields
+        }
+
+        if absolute_fields is None:
+            absolute_fields = {}
+        elif complete_with_stream_id is not None:
+            absolute_fields = absolute_fields.copy()
+            absolute_fields["completed_delta_stream_id"] = complete_with_stream_id
+
+        # first upsert the `_current` table
+        self._upsert_with_additive_relatives_txn(
+            txn=txn,
+            table=table + "_current",
+            keyvalues={id_col: stats_id},
+            absolutes=absolute_fields,
+            additive_relatives=additive_relatives,
         )
 
-        if current_row is None:
-            # we need to insert a row! (insert a dirty, incomplete row)
-            insertee = {
-                id_col: stats_id,
-                "end_ts": end_ts,
-                "start_ts": ts,
-                "completed_delta_stream_id": complete_with_stream_id,
+        if self.has_completed_background_updates():
+            # TODO want to check specifically for stats regenerator, not all
+            #   background updates…
+            # then upsert the `_historical` table.
+            # we don't support absolute_fields for per-slice fields as it makes
+            # no sense.
+            per_slice_additive_relatives = {
+                key: fields.get(key, 0) for key in slice_field_names
             }
-
-            # we assume that, by default, blank fields should be zero.
-            for field_name in ABSOLUTE_STATS_FIELDS[stats_type]:
-                insertee[field_name] = 0
-
-            for field_name in PER_SLICE_FIELDS[stats_type]:
-                insertee[field_name] = 0
-
-            for (field, value) in fields.items():
-                insertee[field] = value
-
-            if absolute_fields is not None:
-                for (field, value) in absolute_fields.items():
-                    insertee[field] = value
-
-            self._simple_insert_txn(txn, table + "_current", insertee)
-
-        elif current_row["end_ts"] is None:
-            # update the row, including start_ts
-            sql = (
-                "UPDATE %s_current SET start_ts = ?, end_ts = ?, %s"
-                " WHERE end_ts IS NULL AND %s = ?"
-            ) % (table, ", ".join(field_sqls), id_col)
-
-            qargs = (
-                [end_ts - self.stats_bucket_size, end_ts]
-                + list(field_values)
-                + [stats_id]
+            self._upsert_copy_from_table_with_additive_relatives_txn(
+                txn=txn,
+                into_table=table + "_historical",
+                keyvalues={id_col: stats_id},
+                extra_dst_keyvalues={
+                    "end_ts": end_ts,
+                    "bucket_size": self.stats_bucket_size,
+                },
+                additive_relatives=per_slice_additive_relatives,
+                src_table=table + "_current",
+                copy_columns=abs_field_names,
+                additional_where=" AND completed_delta_stream_id IS NOT NULL",
             )
-
-            txn.execute(sql, qargs)
-            if txn.rowcount == 0:
-                raise RuntimeError(
-                    "Should be impossible: No rows updated"
-                    " but all conditions are known to be met."
-                )
-
-        elif current_row["end_ts"] < end_ts:
-            # we need to perform old collection first
-            raise OldCollectionRequired()