From 1819563640e1be839c348e18afc1b59c7b3b8c9c Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 20 Aug 2019 14:27:47 +0100 Subject: Ack, isort! Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse/storage/stats.py') diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index e8b1ce240b..4e0a3d4f6e 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -16,9 +16,10 @@ import logging -from synapse.storage.state_deltas import StateDeltasStore from twisted.internet import defer +from synapse.storage.state_deltas import StateDeltasStore + logger = logging.getLogger(__name__) # these fields track absolutes (e.g. total number of rooms on the server) -- cgit 1.5.1 From b5573c0ffb97059f672d465be7dd38c94854411d Mon Sep 17 00:00:00 2001 From: reivilibre <38398653+reivilibre@users.noreply.github.com> Date: Tue, 20 Aug 2019 15:02:49 +0100 Subject: Update synapse/storage/stats.py Co-Authored-By: Erik Johnston --- synapse/storage/stats.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage/stats.py') diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 4e0a3d4f6e..095924cae6 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -68,7 +68,7 @@ class StatsStore(StateDeltasStore): ts: the timestamp to quantise, in seconds since the Unix Epoch Returns: - a timestamp which + int: a timestamp which - is divisible by the bucket size; - is no later than `ts`; and - is the largest such timestamp. -- cgit 1.5.1 From 4a97eef0dc49ee8f3c446221f0fcbb0e65ece113 Mon Sep 17 00:00:00 2001 From: reivilibre <38398653+reivilibre@users.noreply.github.com> Date: Tue, 20 Aug 2019 15:12:21 +0100 Subject: Update synapse/storage/stats.py Co-Authored-By: Erik Johnston --- synapse/storage/stats.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage/stats.py') diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 095924cae6..0445b97b4a 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -65,7 +65,7 @@ class StatsStore(StateDeltasStore): Quantises a timestamp to be a multiple of the bucket size. Args: - ts: the timestamp to quantise, in seconds since the Unix Epoch + ts (int): the timestamp to quantise, in seconds since the Unix Epoch Returns: int: a timestamp which -- cgit 1.5.1 From 981c6cf5442bfb16c177f995deedeb3ec44bf5fb Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 20 Aug 2019 15:41:10 +0100 Subject: Sanitise accepted fields in `_update_stats_delta_txn` Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) (limited to 'synapse/storage/stats.py') diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 0445b97b4a..a372f35eae 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -15,6 +15,7 @@ # limitations under the License. import logging +from itertools import chain from twisted.internet import defer @@ -160,6 +161,17 @@ class StatsStore(StateDeltasStore): quantised_ts = self.quantise_stats_time(int(ts)) end_ts = quantised_ts + self.stats_bucket_size + for field in chain(fields.keys(), absolute_fields.keys()): + if ( + field not in ABSOLUTE_STATS_FIELDS[stats_type] + and field not in PER_SLICE_FIELDS[stats_type] + ): + # guard against potential SQL injection dodginess + raise ValueError( + "%s is not a recognised field" + " for stats type %s" % (field, stats_type) + ) + field_sqls = ["%s = %s + ?" % (field, field) for field in fields.keys()] field_values = list(fields.values()) -- cgit 1.5.1 From 977310ee2767e4edaa20e4a2216be359a7eb8002 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 20 Aug 2019 15:49:00 +0100 Subject: Clarify `_update_stats_delta_txn` Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) (limited to 'synapse/storage/stats.py') diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index a372f35eae..cebe4cbc57 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -183,6 +183,14 @@ class StatsStore(StateDeltasStore): field_sqls.append("completed_delta_stream_id = ?") field_values.append(complete_with_stream_id) + # update current row, but only if it is either: + # - dirty and complete but not old + # If it is old, we should old-collect it first and retry. + # - dirty and incomplete + # Incomplete rows can't be old-collected (as this would commit + # false statistics into the _historical table). + # Instead, their `end_ts` is extended, whilst we wait for them to + # become complete at the hand of the stats regenerator. sql = ( "UPDATE %s_current SET end_ts = ?, %s" " WHERE (end_ts IS NOT NULL AND (end_ts >= ? OR completed_delta_stream_id IS NULL))" @@ -209,6 +217,8 @@ class StatsStore(StateDeltasStore): ) if current_row is None: + # Failure reason: There is no row. + # Solution: # we need to insert a row! (insert a dirty, incomplete row) insertee = { id_col: stats_id, @@ -234,7 +244,9 @@ class StatsStore(StateDeltasStore): self._simple_insert_txn(txn, table + "_current", insertee) elif current_row["end_ts"] is None: - # update the row, including start_ts + # Failure reason: The row is not dirty. + # Solution: + # update the row, including `start_ts`, to make it dirty. sql = ( "UPDATE %s_current SET start_ts = ?, end_ts = ?, %s" " WHERE end_ts IS NULL AND %s = ?" @@ -254,5 +266,6 @@ class StatsStore(StateDeltasStore): ) elif current_row["end_ts"] < end_ts: - # we need to perform old collection first + # Failure reason: The row is complete and old. + # Solution: We need to perform old collection first raise OldCollectionRequired() -- cgit 1.5.1 From 7b657f1148fa10234d52d333ff176969f296aa0f Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 22 Aug 2019 15:40:58 +0100 Subject: Simplify table structure This obviates the need for old collection, but comes at the minor cost of not being able to track historical stats or per-slice fields until after the statistics regenerator is finished. Signed-off-by: Olivier Wilkinson (reivilibre) --- .../storage/schema/delta/56/stats_separated1.sql | 15 +- .../storage/schema/delta/56/stats_separated2.py | 18 +- synapse/storage/stats.py | 280 +++++++++++++-------- 3 files changed, 175 insertions(+), 138 deletions(-) (limited to 'synapse/storage/stats.py') diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql index 1e17eae226..d7418fdf1e 100644 --- a/synapse/storage/schema/delta/56/stats_separated1.sql +++ b/synapse/storage/schema/delta/56/stats_separated1.sql @@ -63,14 +63,10 @@ INSERT INTO stats_incremental_position ( ) VALUES (NULL, NULL, NULL, (0 = 1)), (NULL, NULL, NULL, (1 = 1)); -- represents PRESENT room statistics for a room +-- only holds absolute fields CREATE TABLE IF NOT EXISTS room_stats_current ( room_id TEXT NOT NULL PRIMARY KEY, - -- These starts cover the time from start_ts...end_ts (in seconds). - -- Note that end_ts is quantised, and start_ts usually so. - start_ts BIGINT, - end_ts BIGINT, - current_state_events INT NOT NULL, total_events INT NOT NULL, joined_members INT NOT NULL, @@ -82,8 +78,6 @@ CREATE TABLE IF NOT EXISTS room_stats_current ( -- If initial stats regen has been performed: the maximum delta stream -- position that this row takes into account. completed_delta_stream_id BIGINT, - - CONSTRAINT timestamp_nullity_equality CHECK ((start_ts IS NULL) = (end_ts IS NULL)) ); @@ -91,7 +85,7 @@ CREATE TABLE IF NOT EXISTS room_stats_current ( CREATE TABLE IF NOT EXISTS room_stats_historical ( room_id TEXT NOT NULL, -- These stats cover the time from (end_ts - bucket_size)...end_ts (in seconds). - -- Note that end_ts is quantised, and start_ts usually so. + -- Note that end_ts is quantised. end_ts BIGINT NOT NULL, bucket_size INT NOT NULL, @@ -113,13 +107,10 @@ CREATE INDEX IF NOT EXISTS room_stats_historical_end_ts ON room_stats_historical -- represents PRESENT statistics for a user +-- only holds absolute fields CREATE TABLE IF NOT EXISTS user_stats_current ( user_id TEXT NOT NULL PRIMARY KEY, - -- The timestamp that represents the start of the - start_ts BIGINT, - end_ts BIGINT, - public_rooms INT NOT NULL, private_rooms INT NOT NULL, diff --git a/synapse/storage/schema/delta/56/stats_separated2.py b/synapse/storage/schema/delta/56/stats_separated2.py index 049867fa3e..942d240010 100644 --- a/synapse/storage/schema/delta/56/stats_separated2.py +++ b/synapse/storage/schema/delta/56/stats_separated2.py @@ -32,13 +32,6 @@ def _run_create_generic(stats_type, cursor, database_engine): # It's also the case that SQLite is only likely to be used in small # deployments or testing, where the optimisations gained by use of a # partial index are not a big concern. - cursor.execute( - """ - CREATE INDEX IF NOT EXISTS %s_stats_current_dirty - ON %s_stats_current (end_ts); - """ - % (stats_type, stats_type) - ) cursor.execute( """ CREATE INDEX IF NOT EXISTS %s_stats_not_complete @@ -47,16 +40,7 @@ def _run_create_generic(stats_type, cursor, database_engine): % (stats_type, stats_type, stats_type) ) elif isinstance(database_engine, PostgresEngine): - # This partial index helps us with finding dirty stats rows - cursor.execute( - """ - CREATE INDEX IF NOT EXISTS %s_stats_current_dirty - ON %s_stats_current (end_ts) - WHERE end_ts IS NOT NULL; - """ - % (stats_type, stats_type) - ) - # This partial index helps us with old collection + # This partial index helps us with finding incomplete stats rows cursor.execute( """ CREATE INDEX IF NOT EXISTS %s_stats_not_complete diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index cebe4cbc57..4cb10dc9fb 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -105,7 +105,6 @@ class StatsStore(StateDeltasStore): desc="update_room_state", ) - @defer.inlineCallbacks def update_stats_delta( self, ts, stats_type, stats_id, fields, complete_with_stream_id=None ): @@ -124,22 +123,142 @@ class StatsStore(StateDeltasStore): row was completed. """ - while True: - try: - res = yield self.runInteraction( - "update_stats_delta", - self._update_stats_delta_txn, - ts, - stats_type, - stats_id, - fields, - complete_with_stream_id=complete_with_stream_id, - ) - return res - except OldCollectionRequired: - # retry after collecting old rows - # TODO (implement later) - raise NotImplementedError("old collection not in this PR") + return self.runInteraction( + "update_stats_delta", + self._update_stats_delta_txn, + ts, + stats_type, + stats_id, + fields, + complete_with_stream_id=complete_with_stream_id, + ) + + def _upsert_with_additive_relatives_txn( + self, txn, table, keyvalues, absolutes, additive_relatives + ): + """ + + Args: + txn: Transaction + table (str): Table name + keyvalues (dict[str, any]): Row-identifying key values + absolutes (dict[str, any]): Absolute (set) fields + additive_relatives (dict[str, int]): Fields that will be added onto + if existing row present. + """ + if self.database_engine.can_native_upsert: + absolute_updates = [ + "%(field)s = EXCLUDED.%(field)s" % {"field": field} + for field in absolutes.keys() + ] + + relative_updates = [ + "%(field)s = EXCLUDED.%(field)s + %(table)s.%(field)s" + % {"table": table, "field": field} + for field in additive_relatives.keys() + ] + + insert_cols = [] + qargs = [table] + + for (key, val) in chain( + keyvalues.items(), absolutes.items(), additive_relatives.items() + ): + insert_cols.append(key) + qargs.append(val) + + sql = """ + INSERT INTO %(table)s (%(insert_cols_cs)s) + VALUES (%(insert_vals_qs)s) + ON CONFLICT DO UPDATE SET %(updates)s + """ % { + "table": table, + "insert_cols_cs": ", ".join(insert_cols), + "insert_vals_qs": ", ".join( + ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives)) + ), + "updates": ", ".join(chain(absolute_updates, relative_updates)), + } + + txn.execute(sql, qargs) + else: + retcols = chain(absolutes.keys(), additive_relatives.keys()) + current_row = self._simple_select_one_txn( + txn, table, keyvalues, retcols, allow_none=True + ) + if current_row is None: + merged_dict = {**keyvalues, **absolutes, **additive_relatives} + self._simple_insert_txn(txn, table, merged_dict) + else: + for (key, val) in additive_relatives.items(): + current_row[key] += val + for (key, val) in absolutes.items(): + current_row[key] = val + self._simple_update_one_txn(txn, table, keyvalues, current_row) + + def _upsert_copy_from_table_with_additive_relatives_txn( + self, txn, into_table, keyvalues, additive_relatives, src_table, copy_columns + ): + """ + Args: + txn: Transaction + into_table (str): The destination table to UPSERT the row into + keyvalues (dict[str, any]): Row-identifying key values + additive_relatives (dict[str, any]): Fields that will be added onto + if existing row present. (Must be disjoint from copy_columns.) + src_table (str): The source table to copy from + copy_columns (iterable[str]): The list of columns to copy + """ + if self.database_engine.can_native_upsert: + ins_columns = chain(keyvalues, copy_columns, additive_relatives.keys()) + sel_exprs = chain( + keyvalues, copy_columns, ("?" for _ in additive_relatives) + ) + keyvalues_where = ("%s = ?" % f for f in keyvalues) + + sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns) + sets_ar = ( + "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f) for f in copy_columns + ) + + sql = """ + INSERT INTO %(into_table)s (%(ins_columns)s) + SELECT %(sel_exprs)s + FROM %(src_table)s + WHERE %(keyvalues_where)s + ON CONFLICT (%(keyvalues)s) + DO UPDATE SET %(sets)s + """ % { + "into_table": into_table, + "ins_columns": ", ".join(ins_columns), + "sel_exprs": ", ".join(sel_exprs), + "keyvalues_where": ", ".join(keyvalues_where), + "src_table": src_table, + "keyvalues": ", ".join(keyvalues.keys()), + "sets": ", ".join(chain(sets_cc, sets_ar)), + } + + qargs = chain(additive_relatives.values(), keyvalues.values()) + txn.execute(sql, qargs) + else: + src_row = self._simple_select_one_txn( + txn, src_table, keyvalues, copy_columns + ) + dest_current_row = self._simple_select_one_txn( + txn, + into_table, + keyvalues, + chain(additive_relatives.keys(), copy_columns), + allow_none=True, + ) + + if dest_current_row is None: + merged_dict = {**keyvalues, **src_row, **additive_relatives} + self._simple_insert_txn(txn, into_table, merged_dict) + else: + for (key, val) in additive_relatives.items(): + src_row[key] = dest_current_row[key] + val + self._simple_update_txn(txn, into_table, keyvalues, src_row) def _update_stats_delta_txn( self, @@ -154,118 +273,61 @@ class StatsStore(StateDeltasStore): """ See L{update_stats_delta} Additional Args: - absolute_fields (dict[str, int]): Absolute stats values (i.e. not deltas). + absolute_fields (dict[str, int]): Absolute current stats values + (i.e. not deltas). Does not work with per-slice fields. """ table, id_col = TYPE_TO_TABLE[stats_type] quantised_ts = self.quantise_stats_time(int(ts)) end_ts = quantised_ts + self.stats_bucket_size + abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type] + slice_field_names = PER_SLICE_FIELDS[stats_type] for field in chain(fields.keys(), absolute_fields.keys()): - if ( - field not in ABSOLUTE_STATS_FIELDS[stats_type] - and field not in PER_SLICE_FIELDS[stats_type] - ): + if field not in abs_field_names and field not in slice_field_names: # guard against potential SQL injection dodginess raise ValueError( "%s is not a recognised field" " for stats type %s" % (field, stats_type) ) - field_sqls = ["%s = %s + ?" % (field, field) for field in fields.keys()] - field_values = list(fields.values()) - - if absolute_fields is not None: - field_sqls += ["%s = ?" % (field,) for field in absolute_fields.keys()] - field_values += list(absolute_fields.values()) - - if complete_with_stream_id is not None: - field_sqls.append("completed_delta_stream_id = ?") - field_values.append(complete_with_stream_id) - - # update current row, but only if it is either: - # - dirty and complete but not old - # If it is old, we should old-collect it first and retry. - # - dirty and incomplete - # Incomplete rows can't be old-collected (as this would commit - # false statistics into the _historical table). - # Instead, their `end_ts` is extended, whilst we wait for them to - # become complete at the hand of the stats regenerator. - sql = ( - "UPDATE %s_current SET end_ts = ?, %s" - " WHERE (end_ts IS NOT NULL AND (end_ts >= ? OR completed_delta_stream_id IS NULL))" - " AND %s = ?" - ) % (table, ", ".join(field_sqls), id_col) - - qargs = [end_ts] + list(field_values) + [end_ts, stats_id] - - txn.execute(sql, qargs) + additive_relatives = { + key: fields.get(key, 0) + for key in abs_field_names + if key not in absolute_fields + } - if txn.rowcount > 0: - # success. - return + if absolute_fields is None: + absolute_fields = {} + elif complete_with_stream_id is not None: + absolute_fields = absolute_fields.copy() - # if we're here, it's because we didn't succeed in updating a stats - # row. Why? Let's find out… + absolute_fields["completed_delta_stream_id"] = complete_with_stream_id - current_row = self._simple_select_one_txn( + # first upsert the current table + self._upsert_with_additive_relatives_txn( txn, table + "_current", {id_col: stats_id}, - ("end_ts", "completed_delta_stream_id"), - allow_none=True, + absolute_fields, + additive_relatives, ) - if current_row is None: - # Failure reason: There is no row. - # Solution: - # we need to insert a row! (insert a dirty, incomplete row) - insertee = { - id_col: stats_id, - "end_ts": end_ts, - "start_ts": ts, - "completed_delta_stream_id": complete_with_stream_id, + if self.has_completed_background_updates(): + # TODO want to check specifically for stats regenerator, not all + # background updates… + # then upsert the historical table. + # we don't support absolute_fields for slice_field_names as it makes + # no sense. + per_slice_additive_relatives = { + key: fields.get(key, 0) + for key in slice_field_names } - - # we assume that, by default, blank fields should be zero. - for field_name in ABSOLUTE_STATS_FIELDS[stats_type]: - insertee[field_name] = 0 - - for field_name in PER_SLICE_FIELDS[stats_type]: - insertee[field_name] = 0 - - for (field, value) in fields.items(): - insertee[field] = value - - if absolute_fields is not None: - for (field, value) in absolute_fields.items(): - insertee[field] = value - - self._simple_insert_txn(txn, table + "_current", insertee) - - elif current_row["end_ts"] is None: - # Failure reason: The row is not dirty. - # Solution: - # update the row, including `start_ts`, to make it dirty. - sql = ( - "UPDATE %s_current SET start_ts = ?, end_ts = ?, %s" - " WHERE end_ts IS NULL AND %s = ?" - ) % (table, ", ".join(field_sqls), id_col) - - qargs = ( - [end_ts - self.stats_bucket_size, end_ts] - + list(field_values) - + [stats_id] + self._upsert_copy_from_table_with_additive_relatives_txn( + txn, + table + "_historical", + {id_col: stats_id}, + per_slice_additive_relatives, + table + "_current", + abs_field_names ) - - txn.execute(sql, qargs) - if txn.rowcount == 0: - raise RuntimeError( - "Should be impossible: No rows updated" - " but all conditions are known to be met." - ) - - elif current_row["end_ts"] < end_ts: - # Failure reason: The row is complete and old. - # Solution: We need to perform old collection first - raise OldCollectionRequired() -- cgit 1.5.1 From 79252d1c83f9d230f0e2320cc0a40493e22ad653 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 22 Aug 2019 16:10:32 +0100 Subject: Fix up historical stats support. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 43 ++++++++++++++++++++++++++++++++----------- 1 file changed, 32 insertions(+), 11 deletions(-) (limited to 'synapse/storage/stats.py') diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 4cb10dc9fb..5a7dfde926 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -17,8 +17,6 @@ import logging from itertools import chain -from twisted.internet import defer - from synapse.storage.state_deltas import StateDeltasStore logger = logging.getLogger(__name__) @@ -197,22 +195,41 @@ class StatsStore(StateDeltasStore): self._simple_update_one_txn(txn, table, keyvalues, current_row) def _upsert_copy_from_table_with_additive_relatives_txn( - self, txn, into_table, keyvalues, additive_relatives, src_table, copy_columns + self, + txn, + into_table, + keyvalues, + extra_dst_keyvalues, + additive_relatives, + src_table, + copy_columns, + additional_where="", ): """ Args: txn: Transaction into_table (str): The destination table to UPSERT the row into keyvalues (dict[str, any]): Row-identifying key values + extra_dst_keyvalues (dict[str, any]): Additional keyvalues + for `into_table`. additive_relatives (dict[str, any]): Fields that will be added onto if existing row present. (Must be disjoint from copy_columns.) src_table (str): The source table to copy from copy_columns (iterable[str]): The list of columns to copy + additional_where (str): Additional SQL for where (prefix with AND + if using). """ if self.database_engine.can_native_upsert: - ins_columns = chain(keyvalues, copy_columns, additive_relatives.keys()) + ins_columns = chain( + keyvalues, + copy_columns, + additive_relatives.keys(), + extra_dst_keyvalues.keys(), + ) sel_exprs = chain( - keyvalues, copy_columns, ("?" for _ in additive_relatives) + keyvalues, + copy_columns, + ("?" for _ in chain(additive_relatives, extra_dst_keyvalues)), ) keyvalues_where = ("%s = ?" % f for f in keyvalues) @@ -225,17 +242,20 @@ class StatsStore(StateDeltasStore): INSERT INTO %(into_table)s (%(ins_columns)s) SELECT %(sel_exprs)s FROM %(src_table)s - WHERE %(keyvalues_where)s + WHERE %(keyvalues_where)s %(additional_where)s ON CONFLICT (%(keyvalues)s) DO UPDATE SET %(sets)s """ % { "into_table": into_table, "ins_columns": ", ".join(ins_columns), "sel_exprs": ", ".join(sel_exprs), - "keyvalues_where": ", ".join(keyvalues_where), + "keyvalues_where": " AND ".join(keyvalues_where), "src_table": src_table, - "keyvalues": ", ".join(keyvalues.keys()), + "keyvalues": ", ".join( + chain(keyvalues.keys(), extra_dst_keyvalues.keys()) + ), "sets": ", ".join(chain(sets_cc, sets_ar)), + "additional_where": additional_where, } qargs = chain(additive_relatives.values(), keyvalues.values()) @@ -320,14 +340,15 @@ class StatsStore(StateDeltasStore): # we don't support absolute_fields for slice_field_names as it makes # no sense. per_slice_additive_relatives = { - key: fields.get(key, 0) - for key in slice_field_names + key: fields.get(key, 0) for key in slice_field_names } self._upsert_copy_from_table_with_additive_relatives_txn( txn, table + "_historical", {id_col: stats_id}, + {"end_ts": end_ts, "bucket_size": self.stats_bucket_size}, per_slice_additive_relatives, table + "_current", - abs_field_names + abs_field_names, + additional_where=" AND completed_delta_stream_id IS NOT NULL", ) -- cgit 1.5.1 From 4b7bf2e413a6012c87e3d12f7bf4183b9638836b Mon Sep 17 00:00:00 2001 From: reivilibre <38398653+reivilibre@users.noreply.github.com> Date: Tue, 27 Aug 2019 13:26:08 +0100 Subject: Apply suggestions from code review Co-Authored-By: Erik Johnston --- synapse/storage/stats.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage/stats.py') diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 5a7dfde926..62047839cc 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -134,7 +134,7 @@ class StatsStore(StateDeltasStore): def _upsert_with_additive_relatives_txn( self, txn, table, keyvalues, absolutes, additive_relatives ): - """ + """Used to update values in the stats tables. Args: txn: Transaction @@ -322,7 +322,7 @@ class StatsStore(StateDeltasStore): elif complete_with_stream_id is not None: absolute_fields = absolute_fields.copy() - absolute_fields["completed_delta_stream_id"] = complete_with_stream_id + absolute_fields["completed_delta_stream_id"] = complete_with_stream_id # first upsert the current table self._upsert_with_additive_relatives_txn( -- cgit 1.5.1 From 81c5289c839a6d6888cd849996572aa5c9e19fbd Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 13:35:37 +0100 Subject: Clarify `_update_stats_delta_txn` by adding code comments and kwargs. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 34 ++++++++++++++++++---------------- 1 file changed, 18 insertions(+), 16 deletions(-) (limited to 'synapse/storage/stats.py') diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 62047839cc..7959f5785b 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -311,6 +311,9 @@ class StatsStore(StateDeltasStore): " for stats type %s" % (field, stats_type) ) + # only absolute stats fields are tracked in the `_current` stats tables, + # so those are the only ones that we process deltas for when + # we upsert against the `_current` table. additive_relatives = { key: fields.get(key, 0) for key in abs_field_names @@ -321,34 +324,33 @@ class StatsStore(StateDeltasStore): absolute_fields = {} elif complete_with_stream_id is not None: absolute_fields = absolute_fields.copy() - absolute_fields["completed_delta_stream_id"] = complete_with_stream_id - # first upsert the current table + # first upsert the `_current` table self._upsert_with_additive_relatives_txn( - txn, - table + "_current", - {id_col: stats_id}, - absolute_fields, - additive_relatives, + txn=txn, + table=table + "_current", + keyvalues={id_col: stats_id}, + absolutes=absolute_fields, + additive_relatives=additive_relatives, ) if self.has_completed_background_updates(): # TODO want to check specifically for stats regenerator, not all # background updates… - # then upsert the historical table. - # we don't support absolute_fields for slice_field_names as it makes + # then upsert the `_historical` table. + # we don't support absolute_fields for per-slice fields as it makes # no sense. per_slice_additive_relatives = { key: fields.get(key, 0) for key in slice_field_names } self._upsert_copy_from_table_with_additive_relatives_txn( - txn, - table + "_historical", - {id_col: stats_id}, - {"end_ts": end_ts, "bucket_size": self.stats_bucket_size}, - per_slice_additive_relatives, - table + "_current", - abs_field_names, + txn=txn, + into_table=table + "_historical", + keyvalues={id_col: stats_id}, + extra_dst_keyvalues={"end_ts": end_ts, "bucket_size": self.stats_bucket_size}, + additive_relatives=per_slice_additive_relatives, + src_table=table + "_current", + copy_columns=abs_field_names, additional_where=" AND completed_delta_stream_id IS NOT NULL", ) -- cgit 1.5.1 From 544ba2c2e9ad8d3d9aa9041e3f724e5c96a15390 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 13:40:00 +0100 Subject: Apply minor suggestions from review Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) (limited to 'synapse/storage/stats.py') diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 7959f5785b..c950ab9953 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -190,8 +190,7 @@ class StatsStore(StateDeltasStore): else: for (key, val) in additive_relatives.items(): current_row[key] += val - for (key, val) in absolutes.items(): - current_row[key] = val + current_row.update(absolutes) self._simple_update_one_txn(txn, table, keyvalues, current_row) def _upsert_copy_from_table_with_additive_relatives_txn( @@ -223,8 +222,8 @@ class StatsStore(StateDeltasStore): ins_columns = chain( keyvalues, copy_columns, - additive_relatives.keys(), - extra_dst_keyvalues.keys(), + additive_relatives, + extra_dst_keyvalues, ) sel_exprs = chain( keyvalues, -- cgit 1.5.1 From a6c102009e219d93b512f682e5f799037536e3ee Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 13:41:48 +0100 Subject: Lock tables in upsert fall-backs. Should not be too much of a performance concern as this code won't be hit on Postgres, which large deployments should be using. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'synapse/storage/stats.py') diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index c950ab9953..0f3aa6a801 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -180,6 +180,7 @@ class StatsStore(StateDeltasStore): txn.execute(sql, qargs) else: + self.database_engine.lock_table(txn, table) retcols = chain(absolutes.keys(), additive_relatives.keys()) current_row = self._simple_select_one_txn( txn, table, keyvalues, retcols, allow_none=True @@ -260,6 +261,7 @@ class StatsStore(StateDeltasStore): qargs = chain(additive_relatives.values(), keyvalues.values()) txn.execute(sql, qargs) else: + self.database_engine.lock_table(txn, into_table) src_row = self._simple_select_one_txn( txn, src_table, keyvalues, copy_columns ) -- cgit 1.5.1 From 736ac58e1191fc28abaef5c2cab86901b85ce192 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 13:42:33 +0100 Subject: Code formatting (Black) Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'synapse/storage/stats.py') diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 0f3aa6a801..650c0050cb 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -221,10 +221,7 @@ class StatsStore(StateDeltasStore): """ if self.database_engine.can_native_upsert: ins_columns = chain( - keyvalues, - copy_columns, - additive_relatives, - extra_dst_keyvalues, + keyvalues, copy_columns, additive_relatives, extra_dst_keyvalues ) sel_exprs = chain( keyvalues, @@ -349,7 +346,10 @@ class StatsStore(StateDeltasStore): txn=txn, into_table=table + "_historical", keyvalues={id_col: stats_id}, - extra_dst_keyvalues={"end_ts": end_ts, "bucket_size": self.stats_bucket_size}, + extra_dst_keyvalues={ + "end_ts": end_ts, + "bucket_size": self.stats_bucket_size, + }, additive_relatives=per_slice_additive_relatives, src_table=table + "_current", copy_columns=abs_field_names, -- cgit 1.5.1 From 09cbc3a8e9c0f494fb272cb3761024a851b3e3f8 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 13:50:58 +0100 Subject: Switch to milliseconds in room/user stats for consistency. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/config/stats.py | 13 +++++-------- synapse/storage/schema/delta/56/stats_separated1.sql | 2 +- synapse/storage/stats.py | 3 ++- 3 files changed, 8 insertions(+), 10 deletions(-) (limited to 'synapse/storage/stats.py') diff --git a/synapse/config/stats.py b/synapse/config/stats.py index b518a3ed9c..b18ddbd1fa 100644 --- a/synapse/config/stats.py +++ b/synapse/config/stats.py @@ -27,19 +27,16 @@ class StatsConfig(Config): def read_config(self, config, **kwargs): self.stats_enabled = True - self.stats_bucket_size = 86400 + self.stats_bucket_size = 86400 * 1000 self.stats_retention = sys.maxsize stats_config = config.get("stats", None) if stats_config: self.stats_enabled = stats_config.get("enabled", self.stats_enabled) - self.stats_bucket_size = ( - self.parse_duration(stats_config.get("bucket_size", "1d")) / 1000 + self.stats_bucket_size = self.parse_duration( + stats_config.get("bucket_size", "1d") ) - self.stats_retention = ( - self.parse_duration( - stats_config.get("retention", "%ds" % (sys.maxsize,)) - ) - / 1000 + self.stats_retention = self.parse_duration( + stats_config.get("retention", "%ds" % (sys.maxsize,)) ) def generate_config_section(self, config_dir_path, server_name, **kwargs): diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql index 95daf8f53b..045b5ca013 100644 --- a/synapse/storage/schema/delta/56/stats_separated1.sql +++ b/synapse/storage/schema/delta/56/stats_separated1.sql @@ -84,7 +84,7 @@ CREATE TABLE IF NOT EXISTS room_stats_current ( -- represents HISTORICAL room statistics for a room CREATE TABLE IF NOT EXISTS room_stats_historical ( room_id TEXT NOT NULL, - -- These stats cover the time from (end_ts - bucket_size)...end_ts (in seconds). + -- These stats cover the time from (end_ts - bucket_size)...end_ts (in ms). -- Note that end_ts is quantised. end_ts BIGINT NOT NULL, bucket_size INT NOT NULL, diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 650c0050cb..35fca1dc7b 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -64,7 +64,8 @@ class StatsStore(StateDeltasStore): Quantises a timestamp to be a multiple of the bucket size. Args: - ts (int): the timestamp to quantise, in seconds since the Unix Epoch + ts (int): the timestamp to quantise, in milliseconds since the Unix + Epoch Returns: int: a timestamp which -- cgit 1.5.1