1 files changed, 11 insertions, 8 deletions
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index a0ee8db988..5de9a039a9 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -49,9 +49,6 @@ class StatsHandler(StateDeltasHandler):
# The current position in the current_state_delta stream
self.pos = None
- # Guard to ensure we only process deltas one at a time
- self._is_processing = False
-
if hs.config.stats_enabled:
self.notifier.add_replication_callback(self.notify_new_event)
@@ -65,18 +62,24 @@ class StatsHandler(StateDeltasHandler):
if not self.hs.config.stats_enabled:
return
- if self._is_processing:
- return
+ lock = self.store.stats_delta_processing_lock
@defer.inlineCallbacks
def process():
try:
yield self._unsafe_process()
finally:
- self._is_processing = False
+ lock.release()
- self._is_processing = True
- run_as_background_process("stats.notify_new_event", process)
+ if lock.acquire(blocking=False):
+ # we only want to run this process one-at-a-time,
+ # and also, if the initial background updater wants us to keep out,
+ # we should respect that.
+ try:
+ run_as_background_process("stats.notify_new_event", process)
+ except: # noqa: E722 – re-raised so fine
+ lock.release()
+ raise
@defer.inlineCallbacks
def _unsafe_process(self):
|