diff options
Diffstat (limited to 'synapse/push/pusherpool.py')
-rw-r--r-- | synapse/push/pusherpool.py | 59 |
1 files changed, 45 insertions, 14 deletions
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 08e840fdc2..88d203aa44 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -15,11 +15,17 @@ # limitations under the License. import logging +from collections import defaultdict +from threading import Lock +from typing import Dict, Tuple, Union from twisted.internet import defer +from synapse.metrics import LaterGauge from synapse.metrics.background_process_metrics import run_as_background_process from synapse.push import PusherConfigException +from synapse.push.emailpusher import EmailPusher +from synapse.push.httppusher import HttpPusher from synapse.push.pusher import PusherFactory from synapse.util.async_helpers import concurrently_execute @@ -47,7 +53,29 @@ class PusherPool: self._should_start_pushers = _hs.config.start_pushers self.store = self.hs.get_datastore() self.clock = self.hs.get_clock() - self.pushers = {} + + # map from user id to app_id:pushkey to pusher + self.pushers = {} # type: Dict[str, Dict[str, Union[HttpPusher, EmailPusher]]] + + # a lock for the pushers dict, since `count_pushers` is called from an different + # and we otherwise get concurrent modification errors + self._pushers_lock = Lock() + + def count_pushers(): + results = defaultdict(int) # type: Dict[Tuple[str, str], int] + with self._pushers_lock: + for pushers in self.pushers.values(): + for pusher in pushers.values(): + k = (type(pusher).__name__, pusher.app_id) + results[k] += 1 + return results + + LaterGauge( + name="synapse_pushers", + desc="the number of active pushers", + labels=["kind", "app_id"], + caller=count_pushers, + ) def start(self): """Starts the pushers off in a background process. @@ -103,9 +131,7 @@ class PusherPool: # create the pusher setting last_stream_ordering to the current maximum # stream ordering in event_push_actions, so it will process # pushes from this point onwards. - last_stream_ordering = ( - yield self.store.get_latest_push_action_stream_ordering() - ) + last_stream_ordering = yield self.store.get_latest_push_action_stream_ordering() yield self.store.add_pusher( user_id=user_id, @@ -193,7 +219,7 @@ class PusherPool: min_stream_id - 1, max_stream_id ) # This returns a tuple, user_id is at index 3 - users_affected = set([r[3] for r in updated_receipts]) + users_affected = {r[3] for r in updated_receipts} for u in users_affected: if u in self.pushers: @@ -234,7 +260,6 @@ class PusherPool: Deferred """ pushers = yield self.store.get_all_pushers() - logger.info("Starting %d pushers", len(pushers)) # Stagger starting up the pushers so we don't completely drown the # process on start up. @@ -247,7 +272,7 @@ class PusherPool: """Start the given pusher Args: - pusherdict (dict): + pusherdict (dict): dict with the values pulled from the db table Returns: Deferred[EmailPusher|HttpPusher] @@ -256,7 +281,8 @@ class PusherPool: p = self.pusher_factory.create_pusher(pusherdict) except PusherConfigException as e: logger.warning( - "Pusher incorrectly configured user=%s, appid=%s, pushkey=%s: %s", + "Pusher incorrectly configured id=%i, user=%s, appid=%s, pushkey=%s: %s", + pusherdict["id"], pusherdict.get("user_name"), pusherdict.get("app_id"), pusherdict.get("pushkey"), @@ -264,18 +290,21 @@ class PusherPool: ) return except Exception: - logger.exception("Couldn't start a pusher: caught Exception") + logger.exception( + "Couldn't start pusher id %i: caught Exception", pusherdict["id"], + ) return if not p: return appid_pushkey = "%s:%s" % (pusherdict["app_id"], pusherdict["pushkey"]) - byuser = self.pushers.setdefault(pusherdict["user_name"], {}) - if appid_pushkey in byuser: - byuser[appid_pushkey].on_stop() - byuser[appid_pushkey] = p + with self._pushers_lock: + byuser = self.pushers.setdefault(pusherdict["user_name"], {}) + if appid_pushkey in byuser: + byuser[appid_pushkey].on_stop() + byuser[appid_pushkey] = p # Check if there *may* be push to process. We do this as this check is a # lot cheaper to do than actually fetching the exact rows we need to @@ -304,7 +333,9 @@ class PusherPool: if appid_pushkey in byuser: logger.info("Stopping pusher %s / %s", user_id, appid_pushkey) byuser[appid_pushkey].on_stop() - del byuser[appid_pushkey] + with self._pushers_lock: + del byuser[appid_pushkey] + yield self.store.delete_pusher_by_app_id_pushkey_user_id( app_id, pushkey, user_id ) |