summary refs log tree commit diff
path: root/synapse/storage
diff options
authorOlivier Wilkinson (reivilibre) <>2019-08-22 15:40:58 +0100
committerOlivier Wilkinson (reivilibre) <>2019-08-22 15:40:58 +0100
commit7b657f1148fa10234d52d333ff176969f296aa0f (patch)
treeecb35a61e907149b7947d8e874c060465db7d7b0 /synapse/storage
parentRemove needless defaults. (diff)
Simplify table structure
This obviates the need for old collection, but comes at the minor cost
of not being able to track historical stats or per-slice fields until
after the statistics regenerator is finished.

Signed-off-by: Olivier Wilkinson (reivilibre) <>
Diffstat (limited to 'synapse/storage')
3 files changed, 175 insertions, 138 deletions
diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql
index 1e17eae226..d7418fdf1e 100644
--- a/synapse/storage/schema/delta/56/stats_separated1.sql
+++ b/synapse/storage/schema/delta/56/stats_separated1.sql
@@ -63,14 +63,10 @@ INSERT INTO stats_incremental_position (
 ) VALUES (NULL, NULL, NULL, (0 = 1)), (NULL, NULL, NULL, (1 = 1));
 -- represents PRESENT room statistics for a room
+-- only holds absolute fields
 CREATE TABLE IF NOT EXISTS room_stats_current (
-    -- These starts cover the time from start_ts...end_ts (in seconds).
-    -- Note that end_ts is quantised, and start_ts usually so.
-    start_ts BIGINT,
-    end_ts BIGINT,
     current_state_events INT NOT NULL,
     total_events INT NOT NULL,
     joined_members INT NOT NULL,
@@ -82,8 +78,6 @@ CREATE TABLE IF NOT EXISTS room_stats_current (
     -- If initial stats regen has been performed: the maximum delta stream
     --  position that this row takes into account.
     completed_delta_stream_id BIGINT,
-    CONSTRAINT timestamp_nullity_equality CHECK ((start_ts IS NULL) = (end_ts IS NULL))
@@ -91,7 +85,7 @@ CREATE TABLE IF NOT EXISTS room_stats_current (
 CREATE TABLE IF NOT EXISTS room_stats_historical (
     room_id TEXT NOT NULL,
     -- These stats cover the time from (end_ts - bucket_size)...end_ts (in seconds).
-    -- Note that end_ts is quantised, and start_ts usually so.
+    -- Note that end_ts is quantised.
     end_ts BIGINT NOT NULL,
     bucket_size INT NOT NULL,
@@ -113,13 +107,10 @@ CREATE INDEX IF NOT EXISTS room_stats_historical_end_ts ON room_stats_historical
 -- represents PRESENT statistics for a user
+-- only holds absolute fields
 CREATE TABLE IF NOT EXISTS user_stats_current (
-    -- The timestamp that represents the start of the
-    start_ts BIGINT,
-    end_ts BIGINT,
     public_rooms INT NOT NULL,
     private_rooms INT NOT NULL,
diff --git a/synapse/storage/schema/delta/56/ b/synapse/storage/schema/delta/56/
index 049867fa3e..942d240010 100644
--- a/synapse/storage/schema/delta/56/
+++ b/synapse/storage/schema/delta/56/
@@ -34,29 +34,13 @@ def _run_create_generic(stats_type, cursor, database_engine):
         # partial index are not a big concern.
-                CREATE INDEX IF NOT EXISTS %s_stats_current_dirty
-                    ON %s_stats_current (end_ts);
-            """
-            % (stats_type, stats_type)
-        )
-        cursor.execute(
-            """
                 CREATE INDEX IF NOT EXISTS %s_stats_not_complete
                     ON %s_stats_current (completed_delta_stream_id, %s_id);
             % (stats_type, stats_type, stats_type)
     elif isinstance(database_engine, PostgresEngine):
-        # This partial index helps us with finding dirty stats rows
-        cursor.execute(
-            """
-                CREATE INDEX IF NOT EXISTS %s_stats_current_dirty
-                    ON %s_stats_current (end_ts)
-                    WHERE end_ts IS NOT NULL;
-            """
-            % (stats_type, stats_type)
-        )
-        # This partial index helps us with old collection
+        # This partial index helps us with finding incomplete stats rows
                 CREATE INDEX IF NOT EXISTS %s_stats_not_complete
diff --git a/synapse/storage/ b/synapse/storage/
index cebe4cbc57..4cb10dc9fb 100644
--- a/synapse/storage/
+++ b/synapse/storage/
@@ -105,7 +105,6 @@ class StatsStore(StateDeltasStore):
-    @defer.inlineCallbacks
     def update_stats_delta(
         self, ts, stats_type, stats_id, fields, complete_with_stream_id=None
@@ -124,22 +123,142 @@ class StatsStore(StateDeltasStore):
                 row was completed.
-        while True:
-            try:
-                res = yield self.runInteraction(
-                    "update_stats_delta",
-                    self._update_stats_delta_txn,
-                    ts,
-                    stats_type,
-                    stats_id,
-                    fields,
-                    complete_with_stream_id=complete_with_stream_id,
-                )
-                return res
-            except OldCollectionRequired:
-                # retry after collecting old rows
-                # TODO (implement later)
-                raise NotImplementedError("old collection not in this PR")
+        return self.runInteraction(
+            "update_stats_delta",
+            self._update_stats_delta_txn,
+            ts,
+            stats_type,
+            stats_id,
+            fields,
+            complete_with_stream_id=complete_with_stream_id,
+        )
+    def _upsert_with_additive_relatives_txn(
+        self, txn, table, keyvalues, absolutes, additive_relatives
+    ):
+        """
+        Args:
+            txn: Transaction
+            table (str): Table name
+            keyvalues (dict[str, any]): Row-identifying key values
+            absolutes (dict[str, any]): Absolute (set) fields
+            additive_relatives (dict[str, int]): Fields that will be added onto
+                if existing row present.
+        """
+        if self.database_engine.can_native_upsert:
+            absolute_updates = [
+                "%(field)s = EXCLUDED.%(field)s" % {"field": field}
+                for field in absolutes.keys()
+            ]
+            relative_updates = [
+                "%(field)s = EXCLUDED.%(field)s + %(table)s.%(field)s"
+                % {"table": table, "field": field}
+                for field in additive_relatives.keys()
+            ]
+            insert_cols = []
+            qargs = [table]
+            for (key, val) in chain(
+                keyvalues.items(), absolutes.items(), additive_relatives.items()
+            ):
+                insert_cols.append(key)
+                qargs.append(val)
+            sql = """
+                INSERT INTO %(table)s (%(insert_cols_cs)s)
+                VALUES (%(insert_vals_qs)s)
+                ON CONFLICT DO UPDATE SET %(updates)s
+            """ % {
+                "table": table,
+                "insert_cols_cs": ", ".join(insert_cols),
+                "insert_vals_qs": ", ".join(
+                    ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
+                ),
+                "updates": ", ".join(chain(absolute_updates, relative_updates)),
+            }
+            txn.execute(sql, qargs)
+        else:
+            retcols = chain(absolutes.keys(), additive_relatives.keys())
+            current_row = self._simple_select_one_txn(
+                txn, table, keyvalues, retcols, allow_none=True
+            )
+            if current_row is None:
+                merged_dict = {**keyvalues, **absolutes, **additive_relatives}
+                self._simple_insert_txn(txn, table, merged_dict)
+            else:
+                for (key, val) in additive_relatives.items():
+                    current_row[key] += val
+                for (key, val) in absolutes.items():
+                    current_row[key] = val
+                self._simple_update_one_txn(txn, table, keyvalues, current_row)
+    def _upsert_copy_from_table_with_additive_relatives_txn(
+        self, txn, into_table, keyvalues, additive_relatives, src_table, copy_columns
+    ):
+        """
+        Args:
+             txn: Transaction
+             into_table (str): The destination table to UPSERT the row into
+             keyvalues (dict[str, any]): Row-identifying key values
+             additive_relatives (dict[str, any]): Fields that will be added onto
+                if existing row present. (Must be disjoint from copy_columns.)
+             src_table (str): The source table to copy from
+             copy_columns (iterable[str]): The list of columns to copy
+        """
+        if self.database_engine.can_native_upsert:
+            ins_columns = chain(keyvalues, copy_columns, additive_relatives.keys())
+            sel_exprs = chain(
+                keyvalues, copy_columns, ("?" for _ in additive_relatives)
+            )
+            keyvalues_where = ("%s = ?" % f for f in keyvalues)
+            sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns)
+            sets_ar = (
+                "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f) for f in copy_columns
+            )
+            sql = """
+                INSERT INTO %(into_table)s (%(ins_columns)s)
+                SELECT %(sel_exprs)s
+                FROM %(src_table)s
+                WHERE %(keyvalues_where)s
+                ON CONFLICT (%(keyvalues)s)
+                DO UPDATE SET %(sets)s
+            """ % {
+                "into_table": into_table,
+                "ins_columns": ", ".join(ins_columns),
+                "sel_exprs": ", ".join(sel_exprs),
+                "keyvalues_where": ", ".join(keyvalues_where),
+                "src_table": src_table,
+                "keyvalues": ", ".join(keyvalues.keys()),
+                "sets": ", ".join(chain(sets_cc, sets_ar)),
+            }
+            qargs = chain(additive_relatives.values(), keyvalues.values())
+            txn.execute(sql, qargs)
+        else:
+            src_row = self._simple_select_one_txn(
+                txn, src_table, keyvalues, copy_columns
+            )
+            dest_current_row = self._simple_select_one_txn(
+                txn,
+                into_table,
+                keyvalues,
+                chain(additive_relatives.keys(), copy_columns),
+                allow_none=True,
+            )
+            if dest_current_row is None:
+                merged_dict = {**keyvalues, **src_row, **additive_relatives}
+                self._simple_insert_txn(txn, into_table, merged_dict)
+            else:
+                for (key, val) in additive_relatives.items():
+                    src_row[key] = dest_current_row[key] + val
+                self._simple_update_txn(txn, into_table, keyvalues, src_row)
     def _update_stats_delta_txn(
@@ -154,118 +273,61 @@ class StatsStore(StateDeltasStore):
         See L{update_stats_delta}
         Additional Args:
-            absolute_fields (dict[str, int]): Absolute stats values (i.e. not deltas).
+            absolute_fields (dict[str, int]): Absolute current stats values
+                (i.e. not deltas). Does not work with per-slice fields.
         table, id_col = TYPE_TO_TABLE[stats_type]
         quantised_ts = self.quantise_stats_time(int(ts))
         end_ts = quantised_ts + self.stats_bucket_size
+        abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type]
+        slice_field_names = PER_SLICE_FIELDS[stats_type]
         for field in chain(fields.keys(), absolute_fields.keys()):
-            if (
-                field not in ABSOLUTE_STATS_FIELDS[stats_type]
-                and field not in PER_SLICE_FIELDS[stats_type]
-            ):
+            if field not in abs_field_names and field not in slice_field_names:
                 # guard against potential SQL injection dodginess
                 raise ValueError(
                     "%s is not a recognised field"
                     " for stats type %s" % (field, stats_type)
-        field_sqls = ["%s = %s + ?" % (field, field) for field in fields.keys()]
-        field_values = list(fields.values())
-        if absolute_fields is not None:
-            field_sqls += ["%s = ?" % (field,) for field in absolute_fields.keys()]
-            field_values += list(absolute_fields.values())
-        if complete_with_stream_id is not None:
-            field_sqls.append("completed_delta_stream_id = ?")
-            field_values.append(complete_with_stream_id)
-        # update current row, but only if it is either:
-        # - dirty and complete but not old
-        #       If it is old, we should old-collect it first and retry.
-        # - dirty and incomplete
-        #       Incomplete rows can't be old-collected (as this would commit
-        #       false statistics into the _historical table).
-        #       Instead, their `end_ts` is extended, whilst we wait for them to
-        #       become complete at the hand of the stats regenerator.
-        sql = (
-            "UPDATE %s_current SET end_ts = ?, %s"
-            " WHERE (end_ts IS NOT NULL AND (end_ts >= ? OR completed_delta_stream_id IS NULL))"
-            " AND %s = ?"
-        ) % (table, ", ".join(field_sqls), id_col)
-        qargs = [end_ts] + list(field_values) + [end_ts, stats_id]
-        txn.execute(sql, qargs)
+        additive_relatives = {
+            key: fields.get(key, 0)
+            for key in abs_field_names
+            if key not in absolute_fields
+        }
-        if txn.rowcount > 0:
-            # success.
-            return
+        if absolute_fields is None:
+            absolute_fields = {}
+        elif complete_with_stream_id is not None:
+            absolute_fields = absolute_fields.copy()
-        # if we're here, it's because we didn't succeed in updating a stats
-        # row. Why? Let's find out…
+        absolute_fields["completed_delta_stream_id"] = complete_with_stream_id
-        current_row = self._simple_select_one_txn(
+        # first upsert the current table
+        self._upsert_with_additive_relatives_txn(
             table + "_current",
             {id_col: stats_id},
-            ("end_ts", "completed_delta_stream_id"),
-            allow_none=True,
+            absolute_fields,
+            additive_relatives,
-        if current_row is None:
-            # Failure reason: There is no row.
-            # Solution:
-            # we need to insert a row! (insert a dirty, incomplete row)
-            insertee = {
-                id_col: stats_id,
-                "end_ts": end_ts,
-                "start_ts": ts,
-                "completed_delta_stream_id": complete_with_stream_id,
+        if self.has_completed_background_updates():
+            # TODO want to check specifically for stats regenerator, not all
+            #   background updates…
+            # then upsert the historical table.
+            # we don't support absolute_fields for slice_field_names as it makes
+            # no sense.
+            per_slice_additive_relatives = {
+                key: fields.get(key, 0)
+                for key in slice_field_names
-            # we assume that, by default, blank fields should be zero.
-            for field_name in ABSOLUTE_STATS_FIELDS[stats_type]:
-                insertee[field_name] = 0
-            for field_name in PER_SLICE_FIELDS[stats_type]:
-                insertee[field_name] = 0
-            for (field, value) in fields.items():
-                insertee[field] = value
-            if absolute_fields is not None:
-                for (field, value) in absolute_fields.items():
-                    insertee[field] = value
-            self._simple_insert_txn(txn, table + "_current", insertee)
-        elif current_row["end_ts"] is None:
-            # Failure reason: The row is not dirty.
-            # Solution:
-            # update the row, including `start_ts`, to make it dirty.
-            sql = (
-                "UPDATE %s_current SET start_ts = ?, end_ts = ?, %s"
-                " WHERE end_ts IS NULL AND %s = ?"
-            ) % (table, ", ".join(field_sqls), id_col)
-            qargs = (
-                [end_ts - self.stats_bucket_size, end_ts]
-                + list(field_values)
-                + [stats_id]
+            self._upsert_copy_from_table_with_additive_relatives_txn(
+                txn,
+                table + "_historical",
+                {id_col: stats_id},
+                per_slice_additive_relatives,
+                table + "_current",
+                abs_field_names
-            txn.execute(sql, qargs)
-            if txn.rowcount == 0:
-                raise RuntimeError(
-                    "Should be impossible: No rows updated"
-                    " but all conditions are known to be met."
-                )
-        elif current_row["end_ts"] < end_ts:
-            # Failure reason: The row is complete and old.
-            # Solution: We need to perform old collection first
-            raise OldCollectionRequired()