diff options
author | Richard van der Hoff <richard@matrix.org> | 2020-03-19 10:29:20 +0000 |
---|---|---|
committer | Richard van der Hoff <richard@matrix.org> | 2020-03-19 10:29:20 +0000 |
commit | 7b66a1f0d9b7ec0aaa9809880aaa05a0d3df00ea (patch) | |
tree | 957503b2e31ff7518ae480699e49fbd3789469d5 /synapse/push | |
parent | Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes (diff) | |
parent | Fix concurrent modification errors in pusher metrics (#7106) (diff) | |
download | synapse-7b66a1f0d9b7ec0aaa9809880aaa05a0d3df00ea.tar.xz |
Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes
Diffstat (limited to 'synapse/push')
-rw-r--r-- | synapse/push/pusherpool.py | 27 |
1 files changed, 18 insertions, 9 deletions
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index bf721759df..88d203aa44 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -16,6 +16,7 @@ import logging from collections import defaultdict +from threading import Lock from typing import Dict, Tuple, Union from twisted.internet import defer @@ -56,12 +57,17 @@ class PusherPool: # map from user id to app_id:pushkey to pusher self.pushers = {} # type: Dict[str, Dict[str, Union[HttpPusher, EmailPusher]]] + # a lock for the pushers dict, since `count_pushers` is called from an different + # and we otherwise get concurrent modification errors + self._pushers_lock = Lock() + def count_pushers(): results = defaultdict(int) # type: Dict[Tuple[str, str], int] - for pushers in self.pushers.values(): - for pusher in pushers.values(): - k = (type(pusher).__name__, pusher.app_id) - results[k] += 1 + with self._pushers_lock: + for pushers in self.pushers.values(): + for pusher in pushers.values(): + k = (type(pusher).__name__, pusher.app_id) + results[k] += 1 return results LaterGauge( @@ -293,11 +299,12 @@ class PusherPool: return appid_pushkey = "%s:%s" % (pusherdict["app_id"], pusherdict["pushkey"]) - byuser = self.pushers.setdefault(pusherdict["user_name"], {}) - if appid_pushkey in byuser: - byuser[appid_pushkey].on_stop() - byuser[appid_pushkey] = p + with self._pushers_lock: + byuser = self.pushers.setdefault(pusherdict["user_name"], {}) + if appid_pushkey in byuser: + byuser[appid_pushkey].on_stop() + byuser[appid_pushkey] = p # Check if there *may* be push to process. We do this as this check is a # lot cheaper to do than actually fetching the exact rows we need to @@ -326,7 +333,9 @@ class PusherPool: if appid_pushkey in byuser: logger.info("Stopping pusher %s / %s", user_id, appid_pushkey) byuser[appid_pushkey].on_stop() - del byuser[appid_pushkey] + with self._pushers_lock: + del byuser[appid_pushkey] + yield self.store.delete_pusher_by_app_id_pushkey_user_id( app_id, pushkey, user_id ) |