summary refs log tree commit diff
diff options
context:
space:
mode:
authorOlivier Wilkinson (reivilibre) <olivier@librepush.net>2019-08-08 11:28:42 +0100
committerOlivier Wilkinson (reivilibre) <olivier@librepush.net>2019-08-08 11:28:42 +0100
commit5216299124ce4e66648cef53e3d4e419a746fc55 (patch)
tree57437f43d9e34433ebb4150b15b6fadd037b27c6
parentSchema delta for separated statistics (diff)
downloadsynapse-5216299124ce4e66648cef53e3d4e419a746fc55.tar.xz
Use `threading.Lock` to prevent concurrent incremental position updates
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
-rw-r--r--synapse/handlers/stats.py19
-rw-r--r--synapse/storage/stats.py4
2 files changed, 15 insertions, 8 deletions
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index a0ee8db988..5de9a039a9 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -49,9 +49,6 @@ class StatsHandler(StateDeltasHandler):
         # The current position in the current_state_delta stream
         self.pos = None
 
-        # Guard to ensure we only process deltas one at a time
-        self._is_processing = False
-
         if hs.config.stats_enabled:
             self.notifier.add_replication_callback(self.notify_new_event)
 
@@ -65,18 +62,24 @@ class StatsHandler(StateDeltasHandler):
         if not self.hs.config.stats_enabled:
             return
 
-        if self._is_processing:
-            return
+        lock = self.store.stats_delta_processing_lock
 
         @defer.inlineCallbacks
         def process():
             try:
                 yield self._unsafe_process()
             finally:
-                self._is_processing = False
+                lock.release()
 
-        self._is_processing = True
-        run_as_background_process("stats.notify_new_event", process)
+        if lock.acquire(blocking=False):
+            # we only want to run this process one-at-a-time,
+            # and also, if the initial background updater wants us to keep out,
+            # we should respect that.
+            try:
+                run_as_background_process("stats.notify_new_event", process)
+            except:  # noqa: E722 – re-raised so fine
+                lock.release()
+                raise
 
     @defer.inlineCallbacks
     def _unsafe_process(self):
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index 1cec84ee2e..9196fa664a 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -14,6 +14,8 @@
 # limitations under the License.
 
 import logging
+from itertools import chain
+from threading import Lock
 
 from twisted.internet import defer
 
@@ -51,6 +53,8 @@ class StatsStore(StateDeltasStore):
         self.stats_enabled = hs.config.stats_enabled
         self.stats_bucket_size = hs.config.stats_bucket_size
 
+        self.stats_delta_processing_lock = Lock()
+
         self.register_background_update_handler(
             "populate_stats_createtables", self._populate_stats_createtables
         )