diff options
author | Olivier Wilkinson (reivilibre) <olivier@librepush.net> | 2019-08-30 15:08:44 +0100 |
---|---|---|
committer | Olivier Wilkinson (reivilibre) <olivier@librepush.net> | 2019-08-30 15:08:44 +0100 |
commit | 7dc387e54c51145aa483508f68bd02b8d1a586f3 (patch) | |
tree | 0dbb19ac5c9926040d837e1c86f6e304e928f384 /synapse/storage/stats.py | |
parent | Merge branch 'rei/rss_inc7' into rei/rss_inc8 (diff) | |
parent | Code formatting and typo pointed out by Erik. (diff) | |
download | synapse-7dc387e54c51145aa483508f68bd02b8d1a586f3.tar.xz |
Merge branch 'rei/rss_inc7' into rei/rss_inc8
Diffstat (limited to 'synapse/storage/stats.py')
-rw-r--r-- | synapse/storage/stats.py | 41 |
1 files changed, 25 insertions, 16 deletions
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index f6c785854c..1d5aff00ec 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -157,9 +157,8 @@ class StatsStore(StateDeltasStore): and return its previous positions – atomically. """ - with self.stats_delta_processing_lock: - old = self._get_stats_positions_txn(txn, for_initial_processor=False) - self._update_stats_positions_txn(txn, None, for_initial_processor=False) + old = self._get_stats_positions_txn(txn, for_initial_processor=False) + self._update_stats_positions_txn(txn, None, for_initial_processor=False) return old @@ -210,13 +209,17 @@ class StatsStore(StateDeltasStore): WHERE completed_delta_stream_id IS NULL """ - for _k, (table, id_col) in TYPE_TO_TABLE: + for _k, (table, id_col) in TYPE_TO_TABLE.items(): txn.execute(sql % (table,)) # first wedge the incremental processor and reset our promise - old_positions = yield self.runInteraction( - "populate_stats_wedge", _wedge_incremental_processor - ) + yield self.stats_delta_processing_lock.acquire() + try: + old_positions = yield self.runInteraction( + "populate_stats_wedge", _wedge_incremental_processor + ) + finally: + yield self.stats_delta_processing_lock.release() if None in old_positions.values(): old_positions = None @@ -229,7 +232,12 @@ class StatsStore(StateDeltasStore): yield self._unwedge_incremental_processor(old_positions) - yield self.runInteraction("populate_stats_make_skeletons", _make_skeletons) + yield self.runInteraction( + "populate_stats_make_skeletons", _make_skeletons, "room" + ) + yield self.runInteraction( + "populate_stats_make_skeletons", _make_skeletons, "user" + ) self.get_earliest_token_for_stats.invalidate_all() yield self._end_background_update("populate_stats_prepare") @@ -260,10 +268,11 @@ class StatsStore(StateDeltasStore): # Get how many are left to process, so we can give status on how # far we are in processing - txn.execute( - "SELECT COUNT(*) FROM room_stats_current" - " WHERE completed_delta_stream_id IS NULL" - ) + sql = """ + SELECT COUNT(*) FROM user_stats_current + WHERE completed_delta_stream_id IS NULL + """ + txn.execute(sql) progress["remaining"] = txn.fetchone()[0] return users_to_work_on @@ -385,10 +394,10 @@ class StatsStore(StateDeltasStore): # Get how many are left to process, so we can give status on how # far we are in processing - txn.execute( - "SELECT COUNT(*) FROM room_stats_current" - " WHERE completed_delta_stream_id IS NULL" - ) + sql = """ + SELECT COUNT(*) FROM room_stats_current + WHERE completed_delta_stream_id IS NULL + """ progress["remaining"] = txn.fetchone()[0] return rooms_to_work_on |