From 5216299124ce4e66648cef53e3d4e419a746fc55 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 8 Aug 2019 11:28:42 +0100 Subject: Use `threading.Lock` to prevent concurrent incremental position updates Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/handlers/stats.py | 19 +++++++++++-------- synapse/storage/stats.py | 4 ++++ 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index a0ee8db988..5de9a039a9 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -49,9 +49,6 @@ class StatsHandler(StateDeltasHandler): # The current position in the current_state_delta stream self.pos = None - # Guard to ensure we only process deltas one at a time - self._is_processing = False - if hs.config.stats_enabled: self.notifier.add_replication_callback(self.notify_new_event) @@ -65,18 +62,24 @@ class StatsHandler(StateDeltasHandler): if not self.hs.config.stats_enabled: return - if self._is_processing: - return + lock = self.store.stats_delta_processing_lock @defer.inlineCallbacks def process(): try: yield self._unsafe_process() finally: - self._is_processing = False + lock.release() - self._is_processing = True - run_as_background_process("stats.notify_new_event", process) + if lock.acquire(blocking=False): + # we only want to run this process one-at-a-time, + # and also, if the initial background updater wants us to keep out, + # we should respect that. + try: + run_as_background_process("stats.notify_new_event", process) + except: # noqa: E722 – re-raised so fine + lock.release() + raise @defer.inlineCallbacks def _unsafe_process(self): diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 1cec84ee2e..9196fa664a 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -14,6 +14,8 @@ # limitations under the License. import logging +from itertools import chain +from threading import Lock from twisted.internet import defer @@ -51,6 +53,8 @@ class StatsStore(StateDeltasStore): self.stats_enabled = hs.config.stats_enabled self.stats_bucket_size = hs.config.stats_bucket_size + self.stats_delta_processing_lock = Lock() + self.register_background_update_handler( "populate_stats_createtables", self._populate_stats_createtables ) -- cgit 1.4.1