diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index 9386e9e150..5f8ab9464f 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -157,9 +157,8 @@ class StatsStore(StateDeltasStore):
and return its previous positions – atomically.
"""
- with self.stats_delta_processing_lock:
- old = self._get_stats_positions_txn(txn, for_initial_processor=False)
- self._update_stats_positions_txn(txn, None, for_initial_processor=False)
+ old = self._get_stats_positions_txn(txn, for_initial_processor=False)
+ self._update_stats_positions_txn(txn, None, for_initial_processor=False)
return old
@@ -210,13 +209,17 @@ class StatsStore(StateDeltasStore):
WHERE completed_delta_stream_id IS NULL
"""
- for _k, (table, id_col) in TYPE_TO_TABLE:
+ for _k, (table, id_col) in TYPE_TO_TABLE.items():
txn.execute(sql % (table,))
# first wedge the incremental processor and reset our promise
- old_positions = yield self.runInteraction(
- "populate_stats_wedge", _wedge_incremental_processor
- )
+ yield self.stats_delta_processing_lock.acquire()
+ try:
+ old_positions = yield self.runInteraction(
+ "populate_stats_wedge", _wedge_incremental_processor
+ )
+ finally:
+ yield self.stats_delta_processing_lock.release()
if None in old_positions.values():
old_positions = None
@@ -229,7 +232,8 @@ class StatsStore(StateDeltasStore):
yield self._unwedge_incremental_processor(old_positions)
- yield self.runInteraction("populate_stats_make_skeletons", _make_skeletons)
+ yield self.runInteraction("populate_stats_make_skeletons", _make_skeletons, "room")
+ yield self.runInteraction("populate_stats_make_skeletons", _make_skeletons, "user")
self.get_earliest_token_for_stats.invalidate_all()
yield self._end_background_update("populate_stats_prepare")
|