summary refs log tree commit diff
path: root/synapse/push/pusherpool.py
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2019-06-26 22:34:41 +0100
committerRichard van der Hoff <richard@matrix.org>2019-06-26 22:34:41 +0100
commita4daa899ec4cd195fc10936f68df5c78314b366c (patch)
tree35e88ff388b0f7652773a79930b732aa04f16bde /synapse/push/pusherpool.py
parentchangelog (diff)
parentImprove docs on choosing server_name (#5558) (diff)
downloadsynapse-a4daa899ec4cd195fc10936f68df5c78314b366c.tar.xz
Merge branch 'develop' into rav/saml2_client
Diffstat (limited to 'synapse/push/pusherpool.py')
-rw-r--r--synapse/push/pusherpool.py127
1 files changed, 76 insertions, 51 deletions
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 40a7709c09..df6f670740 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -40,6 +40,7 @@ class PusherPool:
     notifications are sent; accordingly Pusher.on_started, Pusher.on_new_notifications and
     Pusher.on_new_receipts are not expected to return deferreds.
     """
+
     def __init__(self, _hs):
         self.hs = _hs
         self.pusher_factory = PusherFactory(_hs)
@@ -57,30 +58,47 @@ class PusherPool:
         run_as_background_process("start_pushers", self._start_pushers)
 
     @defer.inlineCallbacks
-    def add_pusher(self, user_id, access_token, kind, app_id,
-                   app_display_name, device_display_name, pushkey, lang, data,
-                   profile_tag=""):
+    def add_pusher(
+        self,
+        user_id,
+        access_token,
+        kind,
+        app_id,
+        app_display_name,
+        device_display_name,
+        pushkey,
+        lang,
+        data,
+        profile_tag="",
+    ):
+        """Creates a new pusher and adds it to the pool
+
+        Returns:
+            Deferred[EmailPusher|HttpPusher]
+        """
         time_now_msec = self.clock.time_msec()
 
         # we try to create the pusher just to validate the config: it
         # will then get pulled out of the database,
         # recreated, added and started: this means we have only one
         # code path adding pushers.
-        self.pusher_factory.create_pusher({
-            "id": None,
-            "user_name": user_id,
-            "kind": kind,
-            "app_id": app_id,
-            "app_display_name": app_display_name,
-            "device_display_name": device_display_name,
-            "pushkey": pushkey,
-            "ts": time_now_msec,
-            "lang": lang,
-            "data": data,
-            "last_stream_ordering": None,
-            "last_success": None,
-            "failing_since": None
-        })
+        self.pusher_factory.create_pusher(
+            {
+                "id": None,
+                "user_name": user_id,
+                "kind": kind,
+                "app_id": app_id,
+                "app_display_name": app_display_name,
+                "device_display_name": device_display_name,
+                "pushkey": pushkey,
+                "ts": time_now_msec,
+                "lang": lang,
+                "data": data,
+                "last_stream_ordering": None,
+                "last_success": None,
+                "failing_since": None,
+            }
+        )
 
         # create the pusher setting last_stream_ordering to the current maximum
         # stream ordering in event_push_actions, so it will process
@@ -103,21 +121,24 @@ class PusherPool:
             last_stream_ordering=last_stream_ordering,
             profile_tag=profile_tag,
         )
-        yield self.start_pusher_by_id(app_id, pushkey, user_id)
+        pusher = yield self.start_pusher_by_id(app_id, pushkey, user_id)
+
+        defer.returnValue(pusher)
 
     @defer.inlineCallbacks
-    def remove_pushers_by_app_id_and_pushkey_not_user(self, app_id, pushkey,
-                                                      not_user_id):
-        to_remove = yield self.store.get_pushers_by_app_id_and_pushkey(
-            app_id, pushkey
-        )
+    def remove_pushers_by_app_id_and_pushkey_not_user(
+        self, app_id, pushkey, not_user_id
+    ):
+        to_remove = yield self.store.get_pushers_by_app_id_and_pushkey(app_id, pushkey)
         for p in to_remove:
-            if p['user_name'] != not_user_id:
+            if p["user_name"] != not_user_id:
                 logger.info(
                     "Removing pusher for app id %s, pushkey %s, user %s",
-                    app_id, pushkey, p['user_name']
+                    app_id,
+                    pushkey,
+                    p["user_name"],
                 )
-                yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
+                yield self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])
 
     @defer.inlineCallbacks
     def remove_pushers_by_access_token(self, user_id, access_tokens):
@@ -131,14 +152,14 @@ class PusherPool:
         """
         tokens = set(access_tokens)
         for p in (yield self.store.get_pushers_by_user_id(user_id)):
-            if p['access_token'] in tokens:
+            if p["access_token"] in tokens:
                 logger.info(
                     "Removing pusher for app id %s, pushkey %s, user %s",
-                    p['app_id'], p['pushkey'], p['user_name']
-                )
-                yield self.remove_pusher(
-                    p['app_id'], p['pushkey'], p['user_name'],
+                    p["app_id"],
+                    p["pushkey"],
+                    p["user_name"],
                 )
+                yield self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])
 
     @defer.inlineCallbacks
     def on_new_notifications(self, min_stream_id, max_stream_id):
@@ -184,21 +205,26 @@ class PusherPool:
 
     @defer.inlineCallbacks
     def start_pusher_by_id(self, app_id, pushkey, user_id):
-        """Look up the details for the given pusher, and start it"""
+        """Look up the details for the given pusher, and start it
+
+        Returns:
+            Deferred[EmailPusher|HttpPusher|None]: The pusher started, if any
+        """
         if not self._should_start_pushers:
             return
 
-        resultlist = yield self.store.get_pushers_by_app_id_and_pushkey(
-            app_id, pushkey
-        )
+        resultlist = yield self.store.get_pushers_by_app_id_and_pushkey(app_id, pushkey)
 
-        p = None
+        pusher_dict = None
         for r in resultlist:
-            if r['user_name'] == user_id:
-                p = r
+            if r["user_name"] == user_id:
+                pusher_dict = r
 
-        if p:
-            yield self._start_pusher(p)
+        pusher = None
+        if pusher_dict:
+            pusher = yield self._start_pusher(pusher_dict)
+
+        defer.returnValue(pusher)
 
     @defer.inlineCallbacks
     def _start_pushers(self):
@@ -224,16 +250,16 @@ class PusherPool:
             pusherdict (dict):
 
         Returns:
-            None
+            Deferred[EmailPusher|HttpPusher]
         """
         try:
             p = self.pusher_factory.create_pusher(pusherdict)
         except PusherConfigException as e:
             logger.warning(
                 "Pusher incorrectly configured user=%s, appid=%s, pushkey=%s: %s",
-                pusherdict.get('user_name'),
-                pusherdict.get('app_id'),
-                pusherdict.get('pushkey'),
+                pusherdict.get("user_name"),
+                pusherdict.get("app_id"),
+                pusherdict.get("pushkey"),
                 e,
             )
             return
@@ -244,11 +270,8 @@ class PusherPool:
         if not p:
             return
 
-        appid_pushkey = "%s:%s" % (
-            pusherdict['app_id'],
-            pusherdict['pushkey'],
-        )
-        byuser = self.pushers.setdefault(pusherdict['user_name'], {})
+        appid_pushkey = "%s:%s" % (pusherdict["app_id"], pusherdict["pushkey"])
+        byuser = self.pushers.setdefault(pusherdict["user_name"], {})
 
         if appid_pushkey in byuser:
             byuser[appid_pushkey].on_stop()
@@ -261,7 +284,7 @@ class PusherPool:
         last_stream_ordering = pusherdict["last_stream_ordering"]
         if last_stream_ordering:
             have_notifs = yield self.store.get_if_maybe_push_in_range_for_user(
-                user_id, last_stream_ordering,
+                user_id, last_stream_ordering
             )
         else:
             # We always want to default to starting up the pusher rather than
@@ -270,6 +293,8 @@ class PusherPool:
 
         p.on_started(have_notifs)
 
+        defer.returnValue(p)
+
     @defer.inlineCallbacks
     def remove_pusher(self, app_id, pushkey, user_id):
         appid_pushkey = "%s:%s" % (app_id, pushkey)