summary refs log tree commit diff
path: root/synapse/push
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2019-04-02 16:45:33 +0100
committerErik Johnston <erik@matrix.org>2019-04-02 16:59:13 +0100
commit5bec8d660d4b7d940336048c2ce937225840b5f4 (patch)
treeab7d1c156e0995658ac0e52e0078337638c87094 /synapse/push
parentFix sync bug when accepting invites (#4956) (diff)
downloadsynapse-5bec8d660d4b7d940336048c2ce937225840b5f4.tar.xz
Make starting pushers faster during start up
We start all pushers on start up and immediately start a background
process to fetch push to send. This makes start up incredibly painful
when dealing with many pushers.

Instead, let's do a quick fast DB check to see if there *may* be push to
send and only start the background processes for those pushers. We also
stagger starting up and doing those checks so that we don't try and
handle all pushers at once.
Diffstat (limited to 'synapse/push')
-rw-r--r--synapse/push/emailpusher.py11
-rw-r--r--synapse/push/httppusher.py12
-rw-r--r--synapse/push/pusherpool.py28
3 files changed, 43 insertions, 8 deletions
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index 50e1007d84..e8ee67401f 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -72,8 +72,15 @@ class EmailPusher(object):
 
         self._is_processing = False
 
-    def on_started(self):
-        if self.mailer is not None:
+    def on_started(self, should_check_for_notifs):
+        """Called when this pusher has been started.
+
+        Args:
+            should_check_for_notifs (bool): Whether we should immediately
+                check for push to send. Set to False only if it's known there
+                is nothing to send
+        """
+        if should_check_for_notifs and self.mailer is not None:
             self._start_processing()
 
     def on_stop(self):
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index e65f8c63d3..fac05aa44c 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -112,8 +112,16 @@ class HttpPusher(object):
         self.data_minus_url.update(self.data)
         del self.data_minus_url['url']
 
-    def on_started(self):
-        self._start_processing()
+    def on_started(self, should_check_for_notifs):
+        """Called when this pusher has been started.
+
+        Args:
+            should_check_for_notifs (bool): Whether we should immediately
+                check for push to send. Set to False only if it's known there
+                is nothing to send
+        """
+        if should_check_for_notifs:
+            self._start_processing()
 
     def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
         self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering or 0)
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index abf1a1a9c1..40a7709c09 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -21,6 +21,7 @@ from twisted.internet import defer
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.push import PusherConfigException
 from synapse.push.pusher import PusherFactory
+from synapse.util.async_helpers import concurrently_execute
 
 logger = logging.getLogger(__name__)
 
@@ -197,7 +198,7 @@ class PusherPool:
                 p = r
 
         if p:
-            self._start_pusher(p)
+            yield self._start_pusher(p)
 
     @defer.inlineCallbacks
     def _start_pushers(self):
@@ -208,10 +209,14 @@ class PusherPool:
         """
         pushers = yield self.store.get_all_pushers()
         logger.info("Starting %d pushers", len(pushers))
-        for pusherdict in pushers:
-            self._start_pusher(pusherdict)
+
+        # Stagger starting up the pushers so we don't completely drown the
+        # process on start up.
+        yield concurrently_execute(self._start_pusher, pushers, 10)
+
         logger.info("Started pushers")
 
+    @defer.inlineCallbacks
     def _start_pusher(self, pusherdict):
         """Start the given pusher
 
@@ -248,7 +253,22 @@ class PusherPool:
         if appid_pushkey in byuser:
             byuser[appid_pushkey].on_stop()
         byuser[appid_pushkey] = p
-        p.on_started()
+
+        # Check if there *may* be push to process. We do this as this check is a
+        # lot cheaper to do than actually fetching the exact rows we need to
+        # push.
+        user_id = pusherdict["user_name"]
+        last_stream_ordering = pusherdict["last_stream_ordering"]
+        if last_stream_ordering:
+            have_notifs = yield self.store.get_if_maybe_push_in_range_for_user(
+                user_id, last_stream_ordering,
+            )
+        else:
+            # We always want to default to starting up the pusher rather than
+            # risk missing push.
+            have_notifs = True
+
+        p.on_started(have_notifs)
 
     @defer.inlineCallbacks
     def remove_pusher(self, app_id, pushkey, user_id):