From 5216299124ce4e66648cef53e3d4e419a746fc55 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 8 Aug 2019 11:28:42 +0100 Subject: Use `threading.Lock` to prevent concurrent incremental position updates Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/handlers/stats.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) (limited to 'synapse/handlers/stats.py') diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index a0ee8db988..5de9a039a9 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -49,9 +49,6 @@ class StatsHandler(StateDeltasHandler): # The current position in the current_state_delta stream self.pos = None - # Guard to ensure we only process deltas one at a time - self._is_processing = False - if hs.config.stats_enabled: self.notifier.add_replication_callback(self.notify_new_event) @@ -65,18 +62,24 @@ class StatsHandler(StateDeltasHandler): if not self.hs.config.stats_enabled: return - if self._is_processing: - return + lock = self.store.stats_delta_processing_lock @defer.inlineCallbacks def process(): try: yield self._unsafe_process() finally: - self._is_processing = False + lock.release() - self._is_processing = True - run_as_background_process("stats.notify_new_event", process) + if lock.acquire(blocking=False): + # we only want to run this process one-at-a-time, + # and also, if the initial background updater wants us to keep out, + # we should respect that. + try: + run_as_background_process("stats.notify_new_event", process) + except: # noqa: E722 – re-raised so fine + lock.release() + raise @defer.inlineCallbacks def _unsafe_process(self): -- cgit 1.5.1