diff --git a/changelog.d/7106.feature b/changelog.d/7106.feature
new file mode 100644
index 0000000000..413e7f29d7
--- /dev/null
+++ b/changelog.d/7106.feature
@@ -0,0 +1 @@
+Add prometheus metrics for the number of active pushers.
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index bf721759df..88d203aa44 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -16,6 +16,7 @@
import logging
from collections import defaultdict
+from threading import Lock
from typing import Dict, Tuple, Union
from twisted.internet import defer
@@ -56,12 +57,17 @@ class PusherPool:
# map from user id to app_id:pushkey to pusher
self.pushers = {} # type: Dict[str, Dict[str, Union[HttpPusher, EmailPusher]]]
+ # a lock for the pushers dict, since `count_pushers` is called from an different
+ # and we otherwise get concurrent modification errors
+ self._pushers_lock = Lock()
+
def count_pushers():
results = defaultdict(int) # type: Dict[Tuple[str, str], int]
- for pushers in self.pushers.values():
- for pusher in pushers.values():
- k = (type(pusher).__name__, pusher.app_id)
- results[k] += 1
+ with self._pushers_lock:
+ for pushers in self.pushers.values():
+ for pusher in pushers.values():
+ k = (type(pusher).__name__, pusher.app_id)
+ results[k] += 1
return results
LaterGauge(
@@ -293,11 +299,12 @@ class PusherPool:
return
appid_pushkey = "%s:%s" % (pusherdict["app_id"], pusherdict["pushkey"])
- byuser = self.pushers.setdefault(pusherdict["user_name"], {})
- if appid_pushkey in byuser:
- byuser[appid_pushkey].on_stop()
- byuser[appid_pushkey] = p
+ with self._pushers_lock:
+ byuser = self.pushers.setdefault(pusherdict["user_name"], {})
+ if appid_pushkey in byuser:
+ byuser[appid_pushkey].on_stop()
+ byuser[appid_pushkey] = p
# Check if there *may* be push to process. We do this as this check is a
# lot cheaper to do than actually fetching the exact rows we need to
@@ -326,7 +333,9 @@ class PusherPool:
if appid_pushkey in byuser:
logger.info("Stopping pusher %s / %s", user_id, appid_pushkey)
byuser[appid_pushkey].on_stop()
- del byuser[appid_pushkey]
+ with self._pushers_lock:
+ del byuser[appid_pushkey]
+
yield self.store.delete_pusher_by_app_id_pushkey_user_id(
app_id, pushkey, user_id
)
|