diff options
author | Olivier Wilkinson (reivilibre) <olivier@librepush.net> | 2019-08-08 11:28:42 +0100 |
---|---|---|
committer | Olivier Wilkinson (reivilibre) <olivier@librepush.net> | 2019-08-08 11:28:42 +0100 |
commit | 5216299124ce4e66648cef53e3d4e419a746fc55 (patch) | |
tree | 57437f43d9e34433ebb4150b15b6fadd037b27c6 | |
parent | Schema delta for separated statistics (diff) | |
download | synapse-5216299124ce4e66648cef53e3d4e419a746fc55.tar.xz |
Use `threading.Lock` to prevent concurrent incremental position updates
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
-rw-r--r-- | synapse/handlers/stats.py | 19 | ||||
-rw-r--r-- | synapse/storage/stats.py | 4 |
2 files changed, 15 insertions, 8 deletions
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index a0ee8db988..5de9a039a9 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -49,9 +49,6 @@ class StatsHandler(StateDeltasHandler): # The current position in the current_state_delta stream self.pos = None - # Guard to ensure we only process deltas one at a time - self._is_processing = False - if hs.config.stats_enabled: self.notifier.add_replication_callback(self.notify_new_event) @@ -65,18 +62,24 @@ class StatsHandler(StateDeltasHandler): if not self.hs.config.stats_enabled: return - if self._is_processing: - return + lock = self.store.stats_delta_processing_lock @defer.inlineCallbacks def process(): try: yield self._unsafe_process() finally: - self._is_processing = False + lock.release() - self._is_processing = True - run_as_background_process("stats.notify_new_event", process) + if lock.acquire(blocking=False): + # we only want to run this process one-at-a-time, + # and also, if the initial background updater wants us to keep out, + # we should respect that. + try: + run_as_background_process("stats.notify_new_event", process) + except: # noqa: E722 – re-raised so fine + lock.release() + raise @defer.inlineCallbacks def _unsafe_process(self): diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 1cec84ee2e..9196fa664a 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -14,6 +14,8 @@ # limitations under the License. import logging +from itertools import chain +from threading import Lock from twisted.internet import defer @@ -51,6 +53,8 @@ class StatsStore(StateDeltasStore): self.stats_enabled = hs.config.stats_enabled self.stats_bucket_size = hs.config.stats_bucket_size + self.stats_delta_processing_lock = Lock() + self.register_background_update_handler( "populate_stats_createtables", self._populate_stats_createtables ) |