summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/storage/stats.py41
1 files changed, 25 insertions, 16 deletions
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index f6c785854c..1d5aff00ec 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -157,9 +157,8 @@ class StatsStore(StateDeltasStore):
             and return its previous positions – atomically.
             """
 
-            with self.stats_delta_processing_lock:
-                old = self._get_stats_positions_txn(txn, for_initial_processor=False)
-                self._update_stats_positions_txn(txn, None, for_initial_processor=False)
+            old = self._get_stats_positions_txn(txn, for_initial_processor=False)
+            self._update_stats_positions_txn(txn, None, for_initial_processor=False)
 
             return old
 
@@ -210,13 +209,17 @@ class StatsStore(StateDeltasStore):
                     WHERE completed_delta_stream_id IS NULL
             """
 
-            for _k, (table, id_col) in TYPE_TO_TABLE:
+            for _k, (table, id_col) in TYPE_TO_TABLE.items():
                 txn.execute(sql % (table,))
 
         # first wedge the incremental processor and reset our promise
-        old_positions = yield self.runInteraction(
-            "populate_stats_wedge", _wedge_incremental_processor
-        )
+        yield self.stats_delta_processing_lock.acquire()
+        try:
+            old_positions = yield self.runInteraction(
+                "populate_stats_wedge", _wedge_incremental_processor
+            )
+        finally:
+            yield self.stats_delta_processing_lock.release()
 
         if None in old_positions.values():
             old_positions = None
@@ -229,7 +232,12 @@ class StatsStore(StateDeltasStore):
 
         yield self._unwedge_incremental_processor(old_positions)
 
-        yield self.runInteraction("populate_stats_make_skeletons", _make_skeletons)
+        yield self.runInteraction(
+            "populate_stats_make_skeletons", _make_skeletons, "room"
+        )
+        yield self.runInteraction(
+            "populate_stats_make_skeletons", _make_skeletons, "user"
+        )
         self.get_earliest_token_for_stats.invalidate_all()
 
         yield self._end_background_update("populate_stats_prepare")
@@ -260,10 +268,11 @@ class StatsStore(StateDeltasStore):
 
             # Get how many are left to process, so we can give status on how
             # far we are in processing
-            txn.execute(
-                "SELECT COUNT(*) FROM room_stats_current"
-                " WHERE completed_delta_stream_id IS NULL"
-            )
+            sql = """
+                    SELECT COUNT(*) FROM user_stats_current
+                    WHERE completed_delta_stream_id IS NULL
+                """
+            txn.execute(sql)
             progress["remaining"] = txn.fetchone()[0]
 
             return users_to_work_on
@@ -385,10 +394,10 @@ class StatsStore(StateDeltasStore):
 
             # Get how many are left to process, so we can give status on how
             # far we are in processing
-            txn.execute(
-                "SELECT COUNT(*) FROM room_stats_current"
-                " WHERE completed_delta_stream_id IS NULL"
-            )
+            sql = """
+                    SELECT COUNT(*) FROM room_stats_current
+                    WHERE completed_delta_stream_id IS NULL
+                """
             progress["remaining"] = txn.fetchone()[0]
 
             return rooms_to_work_on