summary refs log tree commit diff
path: root/synapse/push/pusherpool.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/push/pusherpool.py')
-rw-r--r--synapse/push/pusherpool.py45
1 files changed, 38 insertions, 7 deletions
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index b9dca5bc63..88d203aa44 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -15,11 +15,17 @@
 # limitations under the License.
 
 import logging
+from collections import defaultdict
+from threading import Lock
+from typing import Dict, Tuple, Union
 
 from twisted.internet import defer
 
+from synapse.metrics import LaterGauge
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.push import PusherConfigException
+from synapse.push.emailpusher import EmailPusher
+from synapse.push.httppusher import HttpPusher
 from synapse.push.pusher import PusherFactory
 from synapse.util.async_helpers import concurrently_execute
 
@@ -47,7 +53,29 @@ class PusherPool:
         self._should_start_pushers = _hs.config.start_pushers
         self.store = self.hs.get_datastore()
         self.clock = self.hs.get_clock()
-        self.pushers = {}
+
+        # map from user id to app_id:pushkey to pusher
+        self.pushers = {}  # type: Dict[str, Dict[str, Union[HttpPusher, EmailPusher]]]
+
+        # a lock for the pushers dict, since `count_pushers` is called from an different
+        # and we otherwise get concurrent modification errors
+        self._pushers_lock = Lock()
+
+        def count_pushers():
+            results = defaultdict(int)  # type: Dict[Tuple[str, str], int]
+            with self._pushers_lock:
+                for pushers in self.pushers.values():
+                    for pusher in pushers.values():
+                        k = (type(pusher).__name__, pusher.app_id)
+                        results[k] += 1
+            return results
+
+        LaterGauge(
+            name="synapse_pushers",
+            desc="the number of active pushers",
+            labels=["kind", "app_id"],
+            caller=count_pushers,
+        )
 
     def start(self):
         """Starts the pushers off in a background process.
@@ -191,7 +219,7 @@ class PusherPool:
                 min_stream_id - 1, max_stream_id
             )
             # This returns a tuple, user_id is at index 3
-            users_affected = set([r[3] for r in updated_receipts])
+            users_affected = {r[3] for r in updated_receipts}
 
             for u in users_affected:
                 if u in self.pushers:
@@ -271,11 +299,12 @@ class PusherPool:
             return
 
         appid_pushkey = "%s:%s" % (pusherdict["app_id"], pusherdict["pushkey"])
-        byuser = self.pushers.setdefault(pusherdict["user_name"], {})
 
-        if appid_pushkey in byuser:
-            byuser[appid_pushkey].on_stop()
-        byuser[appid_pushkey] = p
+        with self._pushers_lock:
+            byuser = self.pushers.setdefault(pusherdict["user_name"], {})
+            if appid_pushkey in byuser:
+                byuser[appid_pushkey].on_stop()
+            byuser[appid_pushkey] = p
 
         # Check if there *may* be push to process. We do this as this check is a
         # lot cheaper to do than actually fetching the exact rows we need to
@@ -304,7 +333,9 @@ class PusherPool:
         if appid_pushkey in byuser:
             logger.info("Stopping pusher %s / %s", user_id, appid_pushkey)
             byuser[appid_pushkey].on_stop()
-            del byuser[appid_pushkey]
+            with self._pushers_lock:
+                del byuser[appid_pushkey]
+
         yield self.store.delete_pusher_by_app_id_pushkey_user_id(
             app_id, pushkey, user_id
         )