From b06f2947e4a79220564da94f2ca3feb988f033ed Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Wed, 28 Aug 2019 15:46:33 +0100 Subject: Track new users in user statistics. This makes the rows 'completed' so that the stats regenerator need not touch them. --- synapse/storage/registration.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 9027b917c1..672eebbe56 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -868,6 +868,17 @@ class RegistrationStore( (user_id_obj.localpart, create_profile_with_displayname), ) + if self.hs.config.stats_enabled: + # we create a new completed user statistics row + + # we don't strictly need current_token since this user really can't + # have any state deltas before now (as it is a new user), but still, + # we include it for completeness. + current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn) + self._update_stats_delta_txn( + txn, now, "user", user_id, {}, complete_with_stream_id=current_token + ) + self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,)) txn.call_after(self.is_guest.invalidate, (user_id,)) @@ -1139,6 +1150,7 @@ class RegistrationStore( deferred str|None: A str representing a link to redirect the user to if there is one. """ + # Insert everything into a transaction in order to run atomically def validate_threepid_session_txn(txn): row = self._simple_select_one_txn( -- cgit 1.5.1 From 73d552a05d68b2895b796ca95def75778cd00cc2 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Wed, 28 Aug 2019 16:14:00 +0100 Subject: Hoist up None check to prevent trying to iterate over NoneType.keys() --- synapse/storage/stats.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 3df57b52ea..3433eef288 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -258,6 +258,10 @@ class StatsStore(StateDeltasStore): (i.e. not deltas) of absolute fields. Does not work with per-slice fields. """ + + if absolute_field_overrides is None: + absolute_field_overrides = {} + table, id_col = TYPE_TO_TABLE[stats_type] quantised_ts = self.quantise_stats_time(int(ts)) @@ -288,9 +292,6 @@ class StatsStore(StateDeltasStore): if key not in absolute_field_overrides } - if absolute_field_overrides is None: - absolute_field_overrides = {} - if complete_with_stream_id is not None: absolute_field_overrides = absolute_field_overrides.copy() absolute_field_overrides[ -- cgit 1.5.1 From 3b69bf3e74a5b9247f306c6c1e74be09f79822bd Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Wed, 28 Aug 2019 20:18:45 +0100 Subject: Upsert fixes --- synapse/storage/stats.py | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 3433eef288..358ef3fd73 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -320,9 +320,10 @@ class StatsStore(StateDeltasStore): txn=txn, into_table=table + "_historical", keyvalues={id_col: stats_id}, + extra_dst_insvalues={"bucket_size": self.stats_bucket_size, }, extra_dst_keyvalues={ "end_ts": end_ts, - "bucket_size": self.stats_bucket_size, + }, additive_relatives=per_slice_additive_relatives, src_table=table + "_current", @@ -356,7 +357,7 @@ class StatsStore(StateDeltasStore): ] insert_cols = [] - qargs = [table] + qargs = [] for (key, val) in chain( keyvalues.items(), absolutes.items(), additive_relatives.items() @@ -367,13 +368,14 @@ class StatsStore(StateDeltasStore): sql = """ INSERT INTO %(table)s (%(insert_cols_cs)s) VALUES (%(insert_vals_qs)s) - ON CONFLICT DO UPDATE SET %(updates)s + ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s """ % { "table": table, "insert_cols_cs": ", ".join(insert_cols), "insert_vals_qs": ", ".join( ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives)) ), + "key_columns": ", ".join(keyvalues), "updates": ", ".join(chain(absolute_updates, relative_updates)), } @@ -399,6 +401,7 @@ class StatsStore(StateDeltasStore): into_table, keyvalues, extra_dst_keyvalues, + extra_dst_insvalues, additive_relatives, src_table, copy_columns, @@ -411,6 +414,8 @@ class StatsStore(StateDeltasStore): keyvalues (dict[str, any]): Row-identifying key values extra_dst_keyvalues (dict[str, any]): Additional keyvalues for `into_table`. + extra_dst_insvalues (dict[str, any]): Additional values to insert + on new row creation for `into_table`. additive_relatives (dict[str, any]): Fields that will be added onto if existing row present. (Must be disjoint from copy_columns.) src_table (str): The source table to copy from @@ -420,18 +425,18 @@ class StatsStore(StateDeltasStore): """ if self.database_engine.can_native_upsert: ins_columns = chain( - keyvalues, copy_columns, additive_relatives, extra_dst_keyvalues + keyvalues, copy_columns, additive_relatives, extra_dst_keyvalues, extra_dst_insvalues ) sel_exprs = chain( keyvalues, copy_columns, - ("?" for _ in chain(additive_relatives, extra_dst_keyvalues)), + ("?" for _ in chain(additive_relatives, extra_dst_keyvalues, extra_dst_insvalues)), ) keyvalues_where = ("%s = ?" % f for f in keyvalues) sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns) sets_ar = ( - "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f) for f in copy_columns + "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f) for f in additive_relatives ) sql = """ @@ -454,7 +459,8 @@ class StatsStore(StateDeltasStore): "additional_where": additional_where, } - qargs = chain(additive_relatives.values(), keyvalues.values()) + qargs = list(chain(additive_relatives.values(), extra_dst_keyvalues.values(), extra_dst_insvalues.values(), + keyvalues.values())) txn.execute(sql, qargs) else: self.database_engine.lock_table(txn, into_table) @@ -470,7 +476,8 @@ class StatsStore(StateDeltasStore): ) if dest_current_row is None: - merged_dict = {**keyvalues, **src_row, **additive_relatives} + merged_dict = {**keyvalues, **extra_dst_keyvalues, **extra_dst_insvalues, **src_row, + **additive_relatives} self._simple_insert_txn(txn, into_table, merged_dict) else: for (key, val) in additive_relatives.items(): -- cgit 1.5.1 From 4444b9a1b3444fec6a40ec42eadad5ec51ea1eee Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 29 Aug 2019 08:08:41 +0100 Subject: Code formatting (Black) Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 42 ++++++++++++++++++++++++++++++------------ 1 file changed, 30 insertions(+), 12 deletions(-) diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 358ef3fd73..8bbf1b00d1 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -320,11 +320,8 @@ class StatsStore(StateDeltasStore): txn=txn, into_table=table + "_historical", keyvalues={id_col: stats_id}, - extra_dst_insvalues={"bucket_size": self.stats_bucket_size, }, - extra_dst_keyvalues={ - "end_ts": end_ts, - - }, + extra_dst_insvalues={"bucket_size": self.stats_bucket_size}, + extra_dst_keyvalues={"end_ts": end_ts}, additive_relatives=per_slice_additive_relatives, src_table=table + "_current", copy_columns=abs_field_names, @@ -425,18 +422,28 @@ class StatsStore(StateDeltasStore): """ if self.database_engine.can_native_upsert: ins_columns = chain( - keyvalues, copy_columns, additive_relatives, extra_dst_keyvalues, extra_dst_insvalues + keyvalues, + copy_columns, + additive_relatives, + extra_dst_keyvalues, + extra_dst_insvalues, ) sel_exprs = chain( keyvalues, copy_columns, - ("?" for _ in chain(additive_relatives, extra_dst_keyvalues, extra_dst_insvalues)), + ( + "?" + for _ in chain( + additive_relatives, extra_dst_keyvalues, extra_dst_insvalues + ) + ), ) keyvalues_where = ("%s = ?" % f for f in keyvalues) sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns) sets_ar = ( - "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f) for f in additive_relatives + "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f) + for f in additive_relatives ) sql = """ @@ -459,8 +466,14 @@ class StatsStore(StateDeltasStore): "additional_where": additional_where, } - qargs = list(chain(additive_relatives.values(), extra_dst_keyvalues.values(), extra_dst_insvalues.values(), - keyvalues.values())) + qargs = list( + chain( + additive_relatives.values(), + extra_dst_keyvalues.values(), + extra_dst_insvalues.values(), + keyvalues.values(), + ) + ) txn.execute(sql, qargs) else: self.database_engine.lock_table(txn, into_table) @@ -476,8 +489,13 @@ class StatsStore(StateDeltasStore): ) if dest_current_row is None: - merged_dict = {**keyvalues, **extra_dst_keyvalues, **extra_dst_insvalues, **src_row, - **additive_relatives} + merged_dict = { + **keyvalues, + **extra_dst_keyvalues, + **extra_dst_insvalues, + **src_row, + **additive_relatives, + } self._simple_insert_txn(txn, into_table, merged_dict) else: for (key, val) in additive_relatives.items(): -- cgit 1.5.1 From 757205d7186132ae6d1ae189249e6218dd11e32d Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 30 Aug 2019 14:26:02 +0100 Subject: Convert `chain` to `list` as `chain` is only once iterable. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 63c8c2840a..20ce3664a0 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -381,7 +381,7 @@ class StatsStore(StateDeltasStore): txn.execute(sql, qargs) else: self.database_engine.lock_table(txn, table) - retcols = chain(absolutes.keys(), additive_relatives.keys()) + retcols = list(chain(absolutes.keys(), additive_relatives.keys())) current_row = self._simple_select_one_txn( txn, table, keyvalues, retcols, allow_none=True ) @@ -486,7 +486,7 @@ class StatsStore(StateDeltasStore): txn, into_table, keyvalues, - chain(additive_relatives.keys(), copy_columns), + retcols=list(chain(additive_relatives.keys(), copy_columns)), allow_none=True, ) -- cgit 1.5.1