summary refs log tree commit diff
path: root/synapse/push/pusherpool.py
diff options
context:
space:
mode:
authorErik Johnston <erikj@jki.re>2019-04-02 18:23:32 +0100
committerGitHub <noreply@github.com>2019-04-02 18:23:32 +0100
commit3039d61baf7b4ab914a2d659737337cd09127a51 (patch)
tree672c4e457a5596a408423ff37c1ead65475f73b3 /synapse/push/pusherpool.py
parentTransfer related groups on room upgrade (#4990) (diff)
parents/misc/feature/ (diff)
downloadsynapse-3039d61baf7b4ab914a2d659737337cd09127a51.tar.xz
Merge pull request #4991 from matrix-org/erikj/stagger_push_startup
Make starting pushers faster during start up
Diffstat (limited to 'synapse/push/pusherpool.py')
-rw-r--r--synapse/push/pusherpool.py28
1 files changed, 24 insertions, 4 deletions
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index abf1a1a9c1..40a7709c09 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -21,6 +21,7 @@ from twisted.internet import defer
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.push import PusherConfigException
 from synapse.push.pusher import PusherFactory
+from synapse.util.async_helpers import concurrently_execute
 
 logger = logging.getLogger(__name__)
 
@@ -197,7 +198,7 @@ class PusherPool:
                 p = r
 
         if p:
-            self._start_pusher(p)
+            yield self._start_pusher(p)
 
     @defer.inlineCallbacks
     def _start_pushers(self):
@@ -208,10 +209,14 @@ class PusherPool:
         """
         pushers = yield self.store.get_all_pushers()
         logger.info("Starting %d pushers", len(pushers))
-        for pusherdict in pushers:
-            self._start_pusher(pusherdict)
+
+        # Stagger starting up the pushers so we don't completely drown the
+        # process on start up.
+        yield concurrently_execute(self._start_pusher, pushers, 10)
+
         logger.info("Started pushers")
 
+    @defer.inlineCallbacks
     def _start_pusher(self, pusherdict):
         """Start the given pusher
 
@@ -248,7 +253,22 @@ class PusherPool:
         if appid_pushkey in byuser:
             byuser[appid_pushkey].on_stop()
         byuser[appid_pushkey] = p
-        p.on_started()
+
+        # Check if there *may* be push to process. We do this as this check is a
+        # lot cheaper to do than actually fetching the exact rows we need to
+        # push.
+        user_id = pusherdict["user_name"]
+        last_stream_ordering = pusherdict["last_stream_ordering"]
+        if last_stream_ordering:
+            have_notifs = yield self.store.get_if_maybe_push_in_range_for_user(
+                user_id, last_stream_ordering,
+            )
+        else:
+            # We always want to default to starting up the pusher rather than
+            # risk missing push.
+            have_notifs = True
+
+        p.on_started(have_notifs)
 
     @defer.inlineCallbacks
     def remove_pusher(self, app_id, pushkey, user_id):