diff options
Diffstat (limited to 'synapse/storage/stats.py')
-rw-r--r-- | synapse/storage/stats.py | 280 |
1 files changed, 171 insertions, 109 deletions
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index cebe4cbc57..4cb10dc9fb 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -105,7 +105,6 @@ class StatsStore(StateDeltasStore): desc="update_room_state", ) - @defer.inlineCallbacks def update_stats_delta( self, ts, stats_type, stats_id, fields, complete_with_stream_id=None ): @@ -124,22 +123,142 @@ class StatsStore(StateDeltasStore): row was completed. """ - while True: - try: - res = yield self.runInteraction( - "update_stats_delta", - self._update_stats_delta_txn, - ts, - stats_type, - stats_id, - fields, - complete_with_stream_id=complete_with_stream_id, - ) - return res - except OldCollectionRequired: - # retry after collecting old rows - # TODO (implement later) - raise NotImplementedError("old collection not in this PR") + return self.runInteraction( + "update_stats_delta", + self._update_stats_delta_txn, + ts, + stats_type, + stats_id, + fields, + complete_with_stream_id=complete_with_stream_id, + ) + + def _upsert_with_additive_relatives_txn( + self, txn, table, keyvalues, absolutes, additive_relatives + ): + """ + + Args: + txn: Transaction + table (str): Table name + keyvalues (dict[str, any]): Row-identifying key values + absolutes (dict[str, any]): Absolute (set) fields + additive_relatives (dict[str, int]): Fields that will be added onto + if existing row present. + """ + if self.database_engine.can_native_upsert: + absolute_updates = [ + "%(field)s = EXCLUDED.%(field)s" % {"field": field} + for field in absolutes.keys() + ] + + relative_updates = [ + "%(field)s = EXCLUDED.%(field)s + %(table)s.%(field)s" + % {"table": table, "field": field} + for field in additive_relatives.keys() + ] + + insert_cols = [] + qargs = [table] + + for (key, val) in chain( + keyvalues.items(), absolutes.items(), additive_relatives.items() + ): + insert_cols.append(key) + qargs.append(val) + + sql = """ + INSERT INTO %(table)s (%(insert_cols_cs)s) + VALUES (%(insert_vals_qs)s) + ON CONFLICT DO UPDATE SET %(updates)s + """ % { + "table": table, + "insert_cols_cs": ", ".join(insert_cols), + "insert_vals_qs": ", ".join( + ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives)) + ), + "updates": ", ".join(chain(absolute_updates, relative_updates)), + } + + txn.execute(sql, qargs) + else: + retcols = chain(absolutes.keys(), additive_relatives.keys()) + current_row = self._simple_select_one_txn( + txn, table, keyvalues, retcols, allow_none=True + ) + if current_row is None: + merged_dict = {**keyvalues, **absolutes, **additive_relatives} + self._simple_insert_txn(txn, table, merged_dict) + else: + for (key, val) in additive_relatives.items(): + current_row[key] += val + for (key, val) in absolutes.items(): + current_row[key] = val + self._simple_update_one_txn(txn, table, keyvalues, current_row) + + def _upsert_copy_from_table_with_additive_relatives_txn( + self, txn, into_table, keyvalues, additive_relatives, src_table, copy_columns + ): + """ + Args: + txn: Transaction + into_table (str): The destination table to UPSERT the row into + keyvalues (dict[str, any]): Row-identifying key values + additive_relatives (dict[str, any]): Fields that will be added onto + if existing row present. (Must be disjoint from copy_columns.) + src_table (str): The source table to copy from + copy_columns (iterable[str]): The list of columns to copy + """ + if self.database_engine.can_native_upsert: + ins_columns = chain(keyvalues, copy_columns, additive_relatives.keys()) + sel_exprs = chain( + keyvalues, copy_columns, ("?" for _ in additive_relatives) + ) + keyvalues_where = ("%s = ?" % f for f in keyvalues) + + sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns) + sets_ar = ( + "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f) for f in copy_columns + ) + + sql = """ + INSERT INTO %(into_table)s (%(ins_columns)s) + SELECT %(sel_exprs)s + FROM %(src_table)s + WHERE %(keyvalues_where)s + ON CONFLICT (%(keyvalues)s) + DO UPDATE SET %(sets)s + """ % { + "into_table": into_table, + "ins_columns": ", ".join(ins_columns), + "sel_exprs": ", ".join(sel_exprs), + "keyvalues_where": ", ".join(keyvalues_where), + "src_table": src_table, + "keyvalues": ", ".join(keyvalues.keys()), + "sets": ", ".join(chain(sets_cc, sets_ar)), + } + + qargs = chain(additive_relatives.values(), keyvalues.values()) + txn.execute(sql, qargs) + else: + src_row = self._simple_select_one_txn( + txn, src_table, keyvalues, copy_columns + ) + dest_current_row = self._simple_select_one_txn( + txn, + into_table, + keyvalues, + chain(additive_relatives.keys(), copy_columns), + allow_none=True, + ) + + if dest_current_row is None: + merged_dict = {**keyvalues, **src_row, **additive_relatives} + self._simple_insert_txn(txn, into_table, merged_dict) + else: + for (key, val) in additive_relatives.items(): + src_row[key] = dest_current_row[key] + val + self._simple_update_txn(txn, into_table, keyvalues, src_row) def _update_stats_delta_txn( self, @@ -154,118 +273,61 @@ class StatsStore(StateDeltasStore): """ See L{update_stats_delta} Additional Args: - absolute_fields (dict[str, int]): Absolute stats values (i.e. not deltas). + absolute_fields (dict[str, int]): Absolute current stats values + (i.e. not deltas). Does not work with per-slice fields. """ table, id_col = TYPE_TO_TABLE[stats_type] quantised_ts = self.quantise_stats_time(int(ts)) end_ts = quantised_ts + self.stats_bucket_size + abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type] + slice_field_names = PER_SLICE_FIELDS[stats_type] for field in chain(fields.keys(), absolute_fields.keys()): - if ( - field not in ABSOLUTE_STATS_FIELDS[stats_type] - and field not in PER_SLICE_FIELDS[stats_type] - ): + if field not in abs_field_names and field not in slice_field_names: # guard against potential SQL injection dodginess raise ValueError( "%s is not a recognised field" " for stats type %s" % (field, stats_type) ) - field_sqls = ["%s = %s + ?" % (field, field) for field in fields.keys()] - field_values = list(fields.values()) - - if absolute_fields is not None: - field_sqls += ["%s = ?" % (field,) for field in absolute_fields.keys()] - field_values += list(absolute_fields.values()) - - if complete_with_stream_id is not None: - field_sqls.append("completed_delta_stream_id = ?") - field_values.append(complete_with_stream_id) - - # update current row, but only if it is either: - # - dirty and complete but not old - # If it is old, we should old-collect it first and retry. - # - dirty and incomplete - # Incomplete rows can't be old-collected (as this would commit - # false statistics into the _historical table). - # Instead, their `end_ts` is extended, whilst we wait for them to - # become complete at the hand of the stats regenerator. - sql = ( - "UPDATE %s_current SET end_ts = ?, %s" - " WHERE (end_ts IS NOT NULL AND (end_ts >= ? OR completed_delta_stream_id IS NULL))" - " AND %s = ?" - ) % (table, ", ".join(field_sqls), id_col) - - qargs = [end_ts] + list(field_values) + [end_ts, stats_id] - - txn.execute(sql, qargs) + additive_relatives = { + key: fields.get(key, 0) + for key in abs_field_names + if key not in absolute_fields + } - if txn.rowcount > 0: - # success. - return + if absolute_fields is None: + absolute_fields = {} + elif complete_with_stream_id is not None: + absolute_fields = absolute_fields.copy() - # if we're here, it's because we didn't succeed in updating a stats - # row. Why? Let's find out… + absolute_fields["completed_delta_stream_id"] = complete_with_stream_id - current_row = self._simple_select_one_txn( + # first upsert the current table + self._upsert_with_additive_relatives_txn( txn, table + "_current", {id_col: stats_id}, - ("end_ts", "completed_delta_stream_id"), - allow_none=True, + absolute_fields, + additive_relatives, ) - if current_row is None: - # Failure reason: There is no row. - # Solution: - # we need to insert a row! (insert a dirty, incomplete row) - insertee = { - id_col: stats_id, - "end_ts": end_ts, - "start_ts": ts, - "completed_delta_stream_id": complete_with_stream_id, + if self.has_completed_background_updates(): + # TODO want to check specifically for stats regenerator, not all + # background updates… + # then upsert the historical table. + # we don't support absolute_fields for slice_field_names as it makes + # no sense. + per_slice_additive_relatives = { + key: fields.get(key, 0) + for key in slice_field_names } - - # we assume that, by default, blank fields should be zero. - for field_name in ABSOLUTE_STATS_FIELDS[stats_type]: - insertee[field_name] = 0 - - for field_name in PER_SLICE_FIELDS[stats_type]: - insertee[field_name] = 0 - - for (field, value) in fields.items(): - insertee[field] = value - - if absolute_fields is not None: - for (field, value) in absolute_fields.items(): - insertee[field] = value - - self._simple_insert_txn(txn, table + "_current", insertee) - - elif current_row["end_ts"] is None: - # Failure reason: The row is not dirty. - # Solution: - # update the row, including `start_ts`, to make it dirty. - sql = ( - "UPDATE %s_current SET start_ts = ?, end_ts = ?, %s" - " WHERE end_ts IS NULL AND %s = ?" - ) % (table, ", ".join(field_sqls), id_col) - - qargs = ( - [end_ts - self.stats_bucket_size, end_ts] - + list(field_values) - + [stats_id] + self._upsert_copy_from_table_with_additive_relatives_txn( + txn, + table + "_historical", + {id_col: stats_id}, + per_slice_additive_relatives, + table + "_current", + abs_field_names ) - - txn.execute(sql, qargs) - if txn.rowcount == 0: - raise RuntimeError( - "Should be impossible: No rows updated" - " but all conditions are known to be met." - ) - - elif current_row["end_ts"] < end_ts: - # Failure reason: The row is complete and old. - # Solution: We need to perform old collection first - raise OldCollectionRequired() |