diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 08e840fdc2..88d203aa44 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -15,11 +15,17 @@
# limitations under the License.
import logging
+from collections import defaultdict
+from threading import Lock
+from typing import Dict, Tuple, Union
from twisted.internet import defer
+from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push import PusherConfigException
+from synapse.push.emailpusher import EmailPusher
+from synapse.push.httppusher import HttpPusher
from synapse.push.pusher import PusherFactory
from synapse.util.async_helpers import concurrently_execute
@@ -47,7 +53,29 @@ class PusherPool:
self._should_start_pushers = _hs.config.start_pushers
self.store = self.hs.get_datastore()
self.clock = self.hs.get_clock()
- self.pushers = {}
+
+ # map from user id to app_id:pushkey to pusher
+ self.pushers = {} # type: Dict[str, Dict[str, Union[HttpPusher, EmailPusher]]]
+
+ # a lock for the pushers dict, since `count_pushers` is called from an different
+ # and we otherwise get concurrent modification errors
+ self._pushers_lock = Lock()
+
+ def count_pushers():
+ results = defaultdict(int) # type: Dict[Tuple[str, str], int]
+ with self._pushers_lock:
+ for pushers in self.pushers.values():
+ for pusher in pushers.values():
+ k = (type(pusher).__name__, pusher.app_id)
+ results[k] += 1
+ return results
+
+ LaterGauge(
+ name="synapse_pushers",
+ desc="the number of active pushers",
+ labels=["kind", "app_id"],
+ caller=count_pushers,
+ )
def start(self):
"""Starts the pushers off in a background process.
@@ -103,9 +131,7 @@ class PusherPool:
# create the pusher setting last_stream_ordering to the current maximum
# stream ordering in event_push_actions, so it will process
# pushes from this point onwards.
- last_stream_ordering = (
- yield self.store.get_latest_push_action_stream_ordering()
- )
+ last_stream_ordering = yield self.store.get_latest_push_action_stream_ordering()
yield self.store.add_pusher(
user_id=user_id,
@@ -193,7 +219,7 @@ class PusherPool:
min_stream_id - 1, max_stream_id
)
# This returns a tuple, user_id is at index 3
- users_affected = set([r[3] for r in updated_receipts])
+ users_affected = {r[3] for r in updated_receipts}
for u in users_affected:
if u in self.pushers:
@@ -234,7 +260,6 @@ class PusherPool:
Deferred
"""
pushers = yield self.store.get_all_pushers()
- logger.info("Starting %d pushers", len(pushers))
# Stagger starting up the pushers so we don't completely drown the
# process on start up.
@@ -247,7 +272,7 @@ class PusherPool:
"""Start the given pusher
Args:
- pusherdict (dict):
+ pusherdict (dict): dict with the values pulled from the db table
Returns:
Deferred[EmailPusher|HttpPusher]
@@ -256,7 +281,8 @@ class PusherPool:
p = self.pusher_factory.create_pusher(pusherdict)
except PusherConfigException as e:
logger.warning(
- "Pusher incorrectly configured user=%s, appid=%s, pushkey=%s: %s",
+ "Pusher incorrectly configured id=%i, user=%s, appid=%s, pushkey=%s: %s",
+ pusherdict["id"],
pusherdict.get("user_name"),
pusherdict.get("app_id"),
pusherdict.get("pushkey"),
@@ -264,18 +290,21 @@ class PusherPool:
)
return
except Exception:
- logger.exception("Couldn't start a pusher: caught Exception")
+ logger.exception(
+ "Couldn't start pusher id %i: caught Exception", pusherdict["id"],
+ )
return
if not p:
return
appid_pushkey = "%s:%s" % (pusherdict["app_id"], pusherdict["pushkey"])
- byuser = self.pushers.setdefault(pusherdict["user_name"], {})
- if appid_pushkey in byuser:
- byuser[appid_pushkey].on_stop()
- byuser[appid_pushkey] = p
+ with self._pushers_lock:
+ byuser = self.pushers.setdefault(pusherdict["user_name"], {})
+ if appid_pushkey in byuser:
+ byuser[appid_pushkey].on_stop()
+ byuser[appid_pushkey] = p
# Check if there *may* be push to process. We do this as this check is a
# lot cheaper to do than actually fetching the exact rows we need to
@@ -304,7 +333,9 @@ class PusherPool:
if appid_pushkey in byuser:
logger.info("Stopping pusher %s / %s", user_id, appid_pushkey)
byuser[appid_pushkey].on_stop()
- del byuser[appid_pushkey]
+ with self._pushers_lock:
+ del byuser[appid_pushkey]
+
yield self.store.delete_pusher_by_app_id_pushkey_user_id(
app_id, pushkey, user_id
)
|