summary refs log tree commit diff
path: root/synapse/storage/stats.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/stats.py')
-rw-r--r--synapse/storage/stats.py280
1 files changed, 171 insertions, 109 deletions
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index cebe4cbc57..4cb10dc9fb 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -105,7 +105,6 @@ class StatsStore(StateDeltasStore):
             desc="update_room_state",
         )
 
-    @defer.inlineCallbacks
     def update_stats_delta(
         self, ts, stats_type, stats_id, fields, complete_with_stream_id=None
     ):
@@ -124,22 +123,142 @@ class StatsStore(StateDeltasStore):
                 row was completed.
         """
 
-        while True:
-            try:
-                res = yield self.runInteraction(
-                    "update_stats_delta",
-                    self._update_stats_delta_txn,
-                    ts,
-                    stats_type,
-                    stats_id,
-                    fields,
-                    complete_with_stream_id=complete_with_stream_id,
-                )
-                return res
-            except OldCollectionRequired:
-                # retry after collecting old rows
-                # TODO (implement later)
-                raise NotImplementedError("old collection not in this PR")
+        return self.runInteraction(
+            "update_stats_delta",
+            self._update_stats_delta_txn,
+            ts,
+            stats_type,
+            stats_id,
+            fields,
+            complete_with_stream_id=complete_with_stream_id,
+        )
+
+    def _upsert_with_additive_relatives_txn(
+        self, txn, table, keyvalues, absolutes, additive_relatives
+    ):
+        """
+
+        Args:
+            txn: Transaction
+            table (str): Table name
+            keyvalues (dict[str, any]): Row-identifying key values
+            absolutes (dict[str, any]): Absolute (set) fields
+            additive_relatives (dict[str, int]): Fields that will be added onto
+                if existing row present.
+        """
+        if self.database_engine.can_native_upsert:
+            absolute_updates = [
+                "%(field)s = EXCLUDED.%(field)s" % {"field": field}
+                for field in absolutes.keys()
+            ]
+
+            relative_updates = [
+                "%(field)s = EXCLUDED.%(field)s + %(table)s.%(field)s"
+                % {"table": table, "field": field}
+                for field in additive_relatives.keys()
+            ]
+
+            insert_cols = []
+            qargs = [table]
+
+            for (key, val) in chain(
+                keyvalues.items(), absolutes.items(), additive_relatives.items()
+            ):
+                insert_cols.append(key)
+                qargs.append(val)
+
+            sql = """
+                INSERT INTO %(table)s (%(insert_cols_cs)s)
+                VALUES (%(insert_vals_qs)s)
+                ON CONFLICT DO UPDATE SET %(updates)s
+            """ % {
+                "table": table,
+                "insert_cols_cs": ", ".join(insert_cols),
+                "insert_vals_qs": ", ".join(
+                    ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
+                ),
+                "updates": ", ".join(chain(absolute_updates, relative_updates)),
+            }
+
+            txn.execute(sql, qargs)
+        else:
+            retcols = chain(absolutes.keys(), additive_relatives.keys())
+            current_row = self._simple_select_one_txn(
+                txn, table, keyvalues, retcols, allow_none=True
+            )
+            if current_row is None:
+                merged_dict = {**keyvalues, **absolutes, **additive_relatives}
+                self._simple_insert_txn(txn, table, merged_dict)
+            else:
+                for (key, val) in additive_relatives.items():
+                    current_row[key] += val
+                for (key, val) in absolutes.items():
+                    current_row[key] = val
+                self._simple_update_one_txn(txn, table, keyvalues, current_row)
+
+    def _upsert_copy_from_table_with_additive_relatives_txn(
+        self, txn, into_table, keyvalues, additive_relatives, src_table, copy_columns
+    ):
+        """
+        Args:
+             txn: Transaction
+             into_table (str): The destination table to UPSERT the row into
+             keyvalues (dict[str, any]): Row-identifying key values
+             additive_relatives (dict[str, any]): Fields that will be added onto
+                if existing row present. (Must be disjoint from copy_columns.)
+             src_table (str): The source table to copy from
+             copy_columns (iterable[str]): The list of columns to copy
+        """
+        if self.database_engine.can_native_upsert:
+            ins_columns = chain(keyvalues, copy_columns, additive_relatives.keys())
+            sel_exprs = chain(
+                keyvalues, copy_columns, ("?" for _ in additive_relatives)
+            )
+            keyvalues_where = ("%s = ?" % f for f in keyvalues)
+
+            sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns)
+            sets_ar = (
+                "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f) for f in copy_columns
+            )
+
+            sql = """
+                INSERT INTO %(into_table)s (%(ins_columns)s)
+                SELECT %(sel_exprs)s
+                FROM %(src_table)s
+                WHERE %(keyvalues_where)s
+                ON CONFLICT (%(keyvalues)s)
+                DO UPDATE SET %(sets)s
+            """ % {
+                "into_table": into_table,
+                "ins_columns": ", ".join(ins_columns),
+                "sel_exprs": ", ".join(sel_exprs),
+                "keyvalues_where": ", ".join(keyvalues_where),
+                "src_table": src_table,
+                "keyvalues": ", ".join(keyvalues.keys()),
+                "sets": ", ".join(chain(sets_cc, sets_ar)),
+            }
+
+            qargs = chain(additive_relatives.values(), keyvalues.values())
+            txn.execute(sql, qargs)
+        else:
+            src_row = self._simple_select_one_txn(
+                txn, src_table, keyvalues, copy_columns
+            )
+            dest_current_row = self._simple_select_one_txn(
+                txn,
+                into_table,
+                keyvalues,
+                chain(additive_relatives.keys(), copy_columns),
+                allow_none=True,
+            )
+
+            if dest_current_row is None:
+                merged_dict = {**keyvalues, **src_row, **additive_relatives}
+                self._simple_insert_txn(txn, into_table, merged_dict)
+            else:
+                for (key, val) in additive_relatives.items():
+                    src_row[key] = dest_current_row[key] + val
+                self._simple_update_txn(txn, into_table, keyvalues, src_row)
 
     def _update_stats_delta_txn(
         self,
@@ -154,118 +273,61 @@ class StatsStore(StateDeltasStore):
         """
         See L{update_stats_delta}
         Additional Args:
-            absolute_fields (dict[str, int]): Absolute stats values (i.e. not deltas).
+            absolute_fields (dict[str, int]): Absolute current stats values
+                (i.e. not deltas). Does not work with per-slice fields.
         """
         table, id_col = TYPE_TO_TABLE[stats_type]
 
         quantised_ts = self.quantise_stats_time(int(ts))
         end_ts = quantised_ts + self.stats_bucket_size
 
+        abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type]
+        slice_field_names = PER_SLICE_FIELDS[stats_type]
         for field in chain(fields.keys(), absolute_fields.keys()):
-            if (
-                field not in ABSOLUTE_STATS_FIELDS[stats_type]
-                and field not in PER_SLICE_FIELDS[stats_type]
-            ):
+            if field not in abs_field_names and field not in slice_field_names:
                 # guard against potential SQL injection dodginess
                 raise ValueError(
                     "%s is not a recognised field"
                     " for stats type %s" % (field, stats_type)
                 )
 
-        field_sqls = ["%s = %s + ?" % (field, field) for field in fields.keys()]
-        field_values = list(fields.values())
-
-        if absolute_fields is not None:
-            field_sqls += ["%s = ?" % (field,) for field in absolute_fields.keys()]
-            field_values += list(absolute_fields.values())
-
-        if complete_with_stream_id is not None:
-            field_sqls.append("completed_delta_stream_id = ?")
-            field_values.append(complete_with_stream_id)
-
-        # update current row, but only if it is either:
-        # - dirty and complete but not old
-        #       If it is old, we should old-collect it first and retry.
-        # - dirty and incomplete
-        #       Incomplete rows can't be old-collected (as this would commit
-        #       false statistics into the _historical table).
-        #       Instead, their `end_ts` is extended, whilst we wait for them to
-        #       become complete at the hand of the stats regenerator.
-        sql = (
-            "UPDATE %s_current SET end_ts = ?, %s"
-            " WHERE (end_ts IS NOT NULL AND (end_ts >= ? OR completed_delta_stream_id IS NULL))"
-            " AND %s = ?"
-        ) % (table, ", ".join(field_sqls), id_col)
-
-        qargs = [end_ts] + list(field_values) + [end_ts, stats_id]
-
-        txn.execute(sql, qargs)
+        additive_relatives = {
+            key: fields.get(key, 0)
+            for key in abs_field_names
+            if key not in absolute_fields
+        }
 
-        if txn.rowcount > 0:
-            # success.
-            return
+        if absolute_fields is None:
+            absolute_fields = {}
+        elif complete_with_stream_id is not None:
+            absolute_fields = absolute_fields.copy()
 
-        # if we're here, it's because we didn't succeed in updating a stats
-        # row. Why? Let's find out…
+        absolute_fields["completed_delta_stream_id"] = complete_with_stream_id
 
-        current_row = self._simple_select_one_txn(
+        # first upsert the current table
+        self._upsert_with_additive_relatives_txn(
             txn,
             table + "_current",
             {id_col: stats_id},
-            ("end_ts", "completed_delta_stream_id"),
-            allow_none=True,
+            absolute_fields,
+            additive_relatives,
         )
 
-        if current_row is None:
-            # Failure reason: There is no row.
-            # Solution:
-            # we need to insert a row! (insert a dirty, incomplete row)
-            insertee = {
-                id_col: stats_id,
-                "end_ts": end_ts,
-                "start_ts": ts,
-                "completed_delta_stream_id": complete_with_stream_id,
+        if self.has_completed_background_updates():
+            # TODO want to check specifically for stats regenerator, not all
+            #   background updates…
+            # then upsert the historical table.
+            # we don't support absolute_fields for slice_field_names as it makes
+            # no sense.
+            per_slice_additive_relatives = {
+                key: fields.get(key, 0)
+                for key in slice_field_names
             }
-
-            # we assume that, by default, blank fields should be zero.
-            for field_name in ABSOLUTE_STATS_FIELDS[stats_type]:
-                insertee[field_name] = 0
-
-            for field_name in PER_SLICE_FIELDS[stats_type]:
-                insertee[field_name] = 0
-
-            for (field, value) in fields.items():
-                insertee[field] = value
-
-            if absolute_fields is not None:
-                for (field, value) in absolute_fields.items():
-                    insertee[field] = value
-
-            self._simple_insert_txn(txn, table + "_current", insertee)
-
-        elif current_row["end_ts"] is None:
-            # Failure reason: The row is not dirty.
-            # Solution:
-            # update the row, including `start_ts`, to make it dirty.
-            sql = (
-                "UPDATE %s_current SET start_ts = ?, end_ts = ?, %s"
-                " WHERE end_ts IS NULL AND %s = ?"
-            ) % (table, ", ".join(field_sqls), id_col)
-
-            qargs = (
-                [end_ts - self.stats_bucket_size, end_ts]
-                + list(field_values)
-                + [stats_id]
+            self._upsert_copy_from_table_with_additive_relatives_txn(
+                txn,
+                table + "_historical",
+                {id_col: stats_id},
+                per_slice_additive_relatives,
+                table + "_current",
+                abs_field_names
             )
-
-            txn.execute(sql, qargs)
-            if txn.rowcount == 0:
-                raise RuntimeError(
-                    "Should be impossible: No rows updated"
-                    " but all conditions are known to be met."
-                )
-
-        elif current_row["end_ts"] < end_ts:
-            # Failure reason: The row is complete and old.
-            # Solution: We need to perform old collection first
-            raise OldCollectionRequired()