summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/schema/delta/56/stats_separated1.sql15
-rw-r--r--synapse/storage/schema/delta/56/stats_separated2.py18
-rw-r--r--synapse/storage/stats.py280
3 files changed, 175 insertions, 138 deletions
diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql
index 1e17eae226..d7418fdf1e 100644
--- a/synapse/storage/schema/delta/56/stats_separated1.sql
+++ b/synapse/storage/schema/delta/56/stats_separated1.sql
@@ -63,14 +63,10 @@ INSERT INTO stats_incremental_position (
 ) VALUES (NULL, NULL, NULL, (0 = 1)), (NULL, NULL, NULL, (1 = 1));
 
 -- represents PRESENT room statistics for a room
+-- only holds absolute fields
 CREATE TABLE IF NOT EXISTS room_stats_current (
     room_id TEXT NOT NULL PRIMARY KEY,
 
-    -- These starts cover the time from start_ts...end_ts (in seconds).
-    -- Note that end_ts is quantised, and start_ts usually so.
-    start_ts BIGINT,
-    end_ts BIGINT,
-
     current_state_events INT NOT NULL,
     total_events INT NOT NULL,
     joined_members INT NOT NULL,
@@ -82,8 +78,6 @@ CREATE TABLE IF NOT EXISTS room_stats_current (
     -- If initial stats regen has been performed: the maximum delta stream
     --  position that this row takes into account.
     completed_delta_stream_id BIGINT,
-
-    CONSTRAINT timestamp_nullity_equality CHECK ((start_ts IS NULL) = (end_ts IS NULL))
 );
 
 
@@ -91,7 +85,7 @@ CREATE TABLE IF NOT EXISTS room_stats_current (
 CREATE TABLE IF NOT EXISTS room_stats_historical (
     room_id TEXT NOT NULL,
     -- These stats cover the time from (end_ts - bucket_size)...end_ts (in seconds).
-    -- Note that end_ts is quantised, and start_ts usually so.
+    -- Note that end_ts is quantised.
     end_ts BIGINT NOT NULL,
     bucket_size INT NOT NULL,
 
@@ -113,13 +107,10 @@ CREATE INDEX IF NOT EXISTS room_stats_historical_end_ts ON room_stats_historical
 
 
 -- represents PRESENT statistics for a user
+-- only holds absolute fields
 CREATE TABLE IF NOT EXISTS user_stats_current (
     user_id TEXT NOT NULL PRIMARY KEY,
 
-    -- The timestamp that represents the start of the
-    start_ts BIGINT,
-    end_ts BIGINT,
-
     public_rooms INT NOT NULL,
     private_rooms INT NOT NULL,
 
diff --git a/synapse/storage/schema/delta/56/stats_separated2.py b/synapse/storage/schema/delta/56/stats_separated2.py
index 049867fa3e..942d240010 100644
--- a/synapse/storage/schema/delta/56/stats_separated2.py
+++ b/synapse/storage/schema/delta/56/stats_separated2.py
@@ -34,29 +34,13 @@ def _run_create_generic(stats_type, cursor, database_engine):
         # partial index are not a big concern.
         cursor.execute(
             """
-                CREATE INDEX IF NOT EXISTS %s_stats_current_dirty
-                    ON %s_stats_current (end_ts);
-            """
-            % (stats_type, stats_type)
-        )
-        cursor.execute(
-            """
                 CREATE INDEX IF NOT EXISTS %s_stats_not_complete
                     ON %s_stats_current (completed_delta_stream_id, %s_id);
             """
             % (stats_type, stats_type, stats_type)
         )
     elif isinstance(database_engine, PostgresEngine):
-        # This partial index helps us with finding dirty stats rows
-        cursor.execute(
-            """
-                CREATE INDEX IF NOT EXISTS %s_stats_current_dirty
-                    ON %s_stats_current (end_ts)
-                    WHERE end_ts IS NOT NULL;
-            """
-            % (stats_type, stats_type)
-        )
-        # This partial index helps us with old collection
+        # This partial index helps us with finding incomplete stats rows
         cursor.execute(
             """
                 CREATE INDEX IF NOT EXISTS %s_stats_not_complete
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index cebe4cbc57..4cb10dc9fb 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -105,7 +105,6 @@ class StatsStore(StateDeltasStore):
             desc="update_room_state",
         )
 
-    @defer.inlineCallbacks
     def update_stats_delta(
         self, ts, stats_type, stats_id, fields, complete_with_stream_id=None
     ):
@@ -124,22 +123,142 @@ class StatsStore(StateDeltasStore):
                 row was completed.
         """
 
-        while True:
-            try:
-                res = yield self.runInteraction(
-                    "update_stats_delta",
-                    self._update_stats_delta_txn,
-                    ts,
-                    stats_type,
-                    stats_id,
-                    fields,
-                    complete_with_stream_id=complete_with_stream_id,
-                )
-                return res
-            except OldCollectionRequired:
-                # retry after collecting old rows
-                # TODO (implement later)
-                raise NotImplementedError("old collection not in this PR")
+        return self.runInteraction(
+            "update_stats_delta",
+            self._update_stats_delta_txn,
+            ts,
+            stats_type,
+            stats_id,
+            fields,
+            complete_with_stream_id=complete_with_stream_id,
+        )
+
+    def _upsert_with_additive_relatives_txn(
+        self, txn, table, keyvalues, absolutes, additive_relatives
+    ):
+        """
+
+        Args:
+            txn: Transaction
+            table (str): Table name
+            keyvalues (dict[str, any]): Row-identifying key values
+            absolutes (dict[str, any]): Absolute (set) fields
+            additive_relatives (dict[str, int]): Fields that will be added onto
+                if existing row present.
+        """
+        if self.database_engine.can_native_upsert:
+            absolute_updates = [
+                "%(field)s = EXCLUDED.%(field)s" % {"field": field}
+                for field in absolutes.keys()
+            ]
+
+            relative_updates = [
+                "%(field)s = EXCLUDED.%(field)s + %(table)s.%(field)s"
+                % {"table": table, "field": field}
+                for field in additive_relatives.keys()
+            ]
+
+            insert_cols = []
+            qargs = [table]
+
+            for (key, val) in chain(
+                keyvalues.items(), absolutes.items(), additive_relatives.items()
+            ):
+                insert_cols.append(key)
+                qargs.append(val)
+
+            sql = """
+                INSERT INTO %(table)s (%(insert_cols_cs)s)
+                VALUES (%(insert_vals_qs)s)
+                ON CONFLICT DO UPDATE SET %(updates)s
+            """ % {
+                "table": table,
+                "insert_cols_cs": ", ".join(insert_cols),
+                "insert_vals_qs": ", ".join(
+                    ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
+                ),
+                "updates": ", ".join(chain(absolute_updates, relative_updates)),
+            }
+
+            txn.execute(sql, qargs)
+        else:
+            retcols = chain(absolutes.keys(), additive_relatives.keys())
+            current_row = self._simple_select_one_txn(
+                txn, table, keyvalues, retcols, allow_none=True
+            )
+            if current_row is None:
+                merged_dict = {**keyvalues, **absolutes, **additive_relatives}
+                self._simple_insert_txn(txn, table, merged_dict)
+            else:
+                for (key, val) in additive_relatives.items():
+                    current_row[key] += val
+                for (key, val) in absolutes.items():
+                    current_row[key] = val
+                self._simple_update_one_txn(txn, table, keyvalues, current_row)
+
+    def _upsert_copy_from_table_with_additive_relatives_txn(
+        self, txn, into_table, keyvalues, additive_relatives, src_table, copy_columns
+    ):
+        """
+        Args:
+             txn: Transaction
+             into_table (str): The destination table to UPSERT the row into
+             keyvalues (dict[str, any]): Row-identifying key values
+             additive_relatives (dict[str, any]): Fields that will be added onto
+                if existing row present. (Must be disjoint from copy_columns.)
+             src_table (str): The source table to copy from
+             copy_columns (iterable[str]): The list of columns to copy
+        """
+        if self.database_engine.can_native_upsert:
+            ins_columns = chain(keyvalues, copy_columns, additive_relatives.keys())
+            sel_exprs = chain(
+                keyvalues, copy_columns, ("?" for _ in additive_relatives)
+            )
+            keyvalues_where = ("%s = ?" % f for f in keyvalues)
+
+            sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns)
+            sets_ar = (
+                "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f) for f in copy_columns
+            )
+
+            sql = """
+                INSERT INTO %(into_table)s (%(ins_columns)s)
+                SELECT %(sel_exprs)s
+                FROM %(src_table)s
+                WHERE %(keyvalues_where)s
+                ON CONFLICT (%(keyvalues)s)
+                DO UPDATE SET %(sets)s
+            """ % {
+                "into_table": into_table,
+                "ins_columns": ", ".join(ins_columns),
+                "sel_exprs": ", ".join(sel_exprs),
+                "keyvalues_where": ", ".join(keyvalues_where),
+                "src_table": src_table,
+                "keyvalues": ", ".join(keyvalues.keys()),
+                "sets": ", ".join(chain(sets_cc, sets_ar)),
+            }
+
+            qargs = chain(additive_relatives.values(), keyvalues.values())
+            txn.execute(sql, qargs)
+        else:
+            src_row = self._simple_select_one_txn(
+                txn, src_table, keyvalues, copy_columns
+            )
+            dest_current_row = self._simple_select_one_txn(
+                txn,
+                into_table,
+                keyvalues,
+                chain(additive_relatives.keys(), copy_columns),
+                allow_none=True,
+            )
+
+            if dest_current_row is None:
+                merged_dict = {**keyvalues, **src_row, **additive_relatives}
+                self._simple_insert_txn(txn, into_table, merged_dict)
+            else:
+                for (key, val) in additive_relatives.items():
+                    src_row[key] = dest_current_row[key] + val
+                self._simple_update_txn(txn, into_table, keyvalues, src_row)
 
     def _update_stats_delta_txn(
         self,
@@ -154,118 +273,61 @@ class StatsStore(StateDeltasStore):
         """
         See L{update_stats_delta}
         Additional Args:
-            absolute_fields (dict[str, int]): Absolute stats values (i.e. not deltas).
+            absolute_fields (dict[str, int]): Absolute current stats values
+                (i.e. not deltas). Does not work with per-slice fields.
         """
         table, id_col = TYPE_TO_TABLE[stats_type]
 
         quantised_ts = self.quantise_stats_time(int(ts))
         end_ts = quantised_ts + self.stats_bucket_size
 
+        abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type]
+        slice_field_names = PER_SLICE_FIELDS[stats_type]
         for field in chain(fields.keys(), absolute_fields.keys()):
-            if (
-                field not in ABSOLUTE_STATS_FIELDS[stats_type]
-                and field not in PER_SLICE_FIELDS[stats_type]
-            ):
+            if field not in abs_field_names and field not in slice_field_names:
                 # guard against potential SQL injection dodginess
                 raise ValueError(
                     "%s is not a recognised field"
                     " for stats type %s" % (field, stats_type)
                 )
 
-        field_sqls = ["%s = %s + ?" % (field, field) for field in fields.keys()]
-        field_values = list(fields.values())
-
-        if absolute_fields is not None:
-            field_sqls += ["%s = ?" % (field,) for field in absolute_fields.keys()]
-            field_values += list(absolute_fields.values())
-
-        if complete_with_stream_id is not None:
-            field_sqls.append("completed_delta_stream_id = ?")
-            field_values.append(complete_with_stream_id)
-
-        # update current row, but only if it is either:
-        # - dirty and complete but not old
-        #       If it is old, we should old-collect it first and retry.
-        # - dirty and incomplete
-        #       Incomplete rows can't be old-collected (as this would commit
-        #       false statistics into the _historical table).
-        #       Instead, their `end_ts` is extended, whilst we wait for them to
-        #       become complete at the hand of the stats regenerator.
-        sql = (
-            "UPDATE %s_current SET end_ts = ?, %s"
-            " WHERE (end_ts IS NOT NULL AND (end_ts >= ? OR completed_delta_stream_id IS NULL))"
-            " AND %s = ?"
-        ) % (table, ", ".join(field_sqls), id_col)
-
-        qargs = [end_ts] + list(field_values) + [end_ts, stats_id]
-
-        txn.execute(sql, qargs)
+        additive_relatives = {
+            key: fields.get(key, 0)
+            for key in abs_field_names
+            if key not in absolute_fields
+        }
 
-        if txn.rowcount > 0:
-            # success.
-            return
+        if absolute_fields is None:
+            absolute_fields = {}
+        elif complete_with_stream_id is not None:
+            absolute_fields = absolute_fields.copy()
 
-        # if we're here, it's because we didn't succeed in updating a stats
-        # row. Why? Let's find out…
+        absolute_fields["completed_delta_stream_id"] = complete_with_stream_id
 
-        current_row = self._simple_select_one_txn(
+        # first upsert the current table
+        self._upsert_with_additive_relatives_txn(
             txn,
             table + "_current",
             {id_col: stats_id},
-            ("end_ts", "completed_delta_stream_id"),
-            allow_none=True,
+            absolute_fields,
+            additive_relatives,
         )
 
-        if current_row is None:
-            # Failure reason: There is no row.
-            # Solution:
-            # we need to insert a row! (insert a dirty, incomplete row)
-            insertee = {
-                id_col: stats_id,
-                "end_ts": end_ts,
-                "start_ts": ts,
-                "completed_delta_stream_id": complete_with_stream_id,
+        if self.has_completed_background_updates():
+            # TODO want to check specifically for stats regenerator, not all
+            #   background updates…
+            # then upsert the historical table.
+            # we don't support absolute_fields for slice_field_names as it makes
+            # no sense.
+            per_slice_additive_relatives = {
+                key: fields.get(key, 0)
+                for key in slice_field_names
             }
-
-            # we assume that, by default, blank fields should be zero.
-            for field_name in ABSOLUTE_STATS_FIELDS[stats_type]:
-                insertee[field_name] = 0
-
-            for field_name in PER_SLICE_FIELDS[stats_type]:
-                insertee[field_name] = 0
-
-            for (field, value) in fields.items():
-                insertee[field] = value
-
-            if absolute_fields is not None:
-                for (field, value) in absolute_fields.items():
-                    insertee[field] = value
-
-            self._simple_insert_txn(txn, table + "_current", insertee)
-
-        elif current_row["end_ts"] is None:
-            # Failure reason: The row is not dirty.
-            # Solution:
-            # update the row, including `start_ts`, to make it dirty.
-            sql = (
-                "UPDATE %s_current SET start_ts = ?, end_ts = ?, %s"
-                " WHERE end_ts IS NULL AND %s = ?"
-            ) % (table, ", ".join(field_sqls), id_col)
-
-            qargs = (
-                [end_ts - self.stats_bucket_size, end_ts]
-                + list(field_values)
-                + [stats_id]
+            self._upsert_copy_from_table_with_additive_relatives_txn(
+                txn,
+                table + "_historical",
+                {id_col: stats_id},
+                per_slice_additive_relatives,
+                table + "_current",
+                abs_field_names
             )
-
-            txn.execute(sql, qargs)
-            if txn.rowcount == 0:
-                raise RuntimeError(
-                    "Should be impossible: No rows updated"
-                    " but all conditions are known to be met."
-                )
-
-        elif current_row["end_ts"] < end_ts:
-            # Failure reason: The row is complete and old.
-            # Solution: We need to perform old collection first
-            raise OldCollectionRequired()