summary refs log tree commit diff
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2020-03-19 10:29:20 +0000
committerRichard van der Hoff <richard@matrix.org>2020-03-19 10:29:20 +0000
commit7b66a1f0d9b7ec0aaa9809880aaa05a0d3df00ea (patch)
tree957503b2e31ff7518ae480699e49fbd3789469d5
parentMerge remote-tracking branch 'origin/develop' into matrix-org-hotfixes (diff)
parentFix concurrent modification errors in pusher metrics (#7106) (diff)
downloadsynapse-7b66a1f0d9b7ec0aaa9809880aaa05a0d3df00ea.tar.xz
Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes
-rw-r--r--changelog.d/7106.feature1
-rw-r--r--synapse/push/pusherpool.py27
2 files changed, 19 insertions, 9 deletions
diff --git a/changelog.d/7106.feature b/changelog.d/7106.feature
new file mode 100644
index 0000000000..413e7f29d7
--- /dev/null
+++ b/changelog.d/7106.feature
@@ -0,0 +1 @@
+Add prometheus metrics for the number of active pushers.
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index bf721759df..88d203aa44 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -16,6 +16,7 @@
 
 import logging
 from collections import defaultdict
+from threading import Lock
 from typing import Dict, Tuple, Union
 
 from twisted.internet import defer
@@ -56,12 +57,17 @@ class PusherPool:
         # map from user id to app_id:pushkey to pusher
         self.pushers = {}  # type: Dict[str, Dict[str, Union[HttpPusher, EmailPusher]]]
 
+        # a lock for the pushers dict, since `count_pushers` is called from an different
+        # and we otherwise get concurrent modification errors
+        self._pushers_lock = Lock()
+
         def count_pushers():
             results = defaultdict(int)  # type: Dict[Tuple[str, str], int]
-            for pushers in self.pushers.values():
-                for pusher in pushers.values():
-                    k = (type(pusher).__name__, pusher.app_id)
-                    results[k] += 1
+            with self._pushers_lock:
+                for pushers in self.pushers.values():
+                    for pusher in pushers.values():
+                        k = (type(pusher).__name__, pusher.app_id)
+                        results[k] += 1
             return results
 
         LaterGauge(
@@ -293,11 +299,12 @@ class PusherPool:
             return
 
         appid_pushkey = "%s:%s" % (pusherdict["app_id"], pusherdict["pushkey"])
-        byuser = self.pushers.setdefault(pusherdict["user_name"], {})
 
-        if appid_pushkey in byuser:
-            byuser[appid_pushkey].on_stop()
-        byuser[appid_pushkey] = p
+        with self._pushers_lock:
+            byuser = self.pushers.setdefault(pusherdict["user_name"], {})
+            if appid_pushkey in byuser:
+                byuser[appid_pushkey].on_stop()
+            byuser[appid_pushkey] = p
 
         # Check if there *may* be push to process. We do this as this check is a
         # lot cheaper to do than actually fetching the exact rows we need to
@@ -326,7 +333,9 @@ class PusherPool:
         if appid_pushkey in byuser:
             logger.info("Stopping pusher %s / %s", user_id, appid_pushkey)
             byuser[appid_pushkey].on_stop()
-            del byuser[appid_pushkey]
+            with self._pushers_lock:
+                del byuser[appid_pushkey]
+
         yield self.store.delete_pusher_by_app_id_pushkey_user_id(
             app_id, pushkey, user_id
         )