summary refs log tree commit diff
path: root/synapse/push/httppusher.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/push/httppusher.py')
-rw-r--r--synapse/push/httppusher.py86
1 files changed, 45 insertions, 41 deletions
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index eff0975b6a..5408aa1295 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -14,19 +14,25 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
+import urllib.parse
+from typing import TYPE_CHECKING, Any, Dict, Iterable, Union
 
 from prometheus_client import Counter
 
 from twisted.internet.error import AlreadyCalled, AlreadyCancelled
 
 from synapse.api.constants import EventTypes
+from synapse.events import EventBase
 from synapse.logging import opentracing
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.push import PusherConfigException
+from synapse.push import Pusher, PusherConfigException
 from synapse.types import RoomStreamToken
 
 from . import push_rule_evaluator, push_tools
 
+if TYPE_CHECKING:
+    from synapse.app.homeserver import HomeServer
+
 logger = logging.getLogger(__name__)
 
 http_push_processed_counter = Counter(
@@ -50,24 +56,18 @@ http_badges_failed_counter = Counter(
 )
 
 
-class HttpPusher:
+class HttpPusher(Pusher):
     INITIAL_BACKOFF_SEC = 1  # in seconds because that's what Twisted takes
     MAX_BACKOFF_SEC = 60 * 60
 
     # This one's in ms because we compare it against the clock
     GIVE_UP_AFTER_MS = 24 * 60 * 60 * 1000
 
-    def __init__(self, hs, pusherdict):
-        self.hs = hs
-        self.store = self.hs.get_datastore()
+    def __init__(self, hs: "HomeServer", pusherdict: Dict[str, Any]):
+        super().__init__(hs, pusherdict)
         self.storage = self.hs.get_storage()
-        self.clock = self.hs.get_clock()
-        self.state_handler = self.hs.get_state_handler()
-        self.user_id = pusherdict["user_name"]
-        self.app_id = pusherdict["app_id"]
         self.app_display_name = pusherdict["app_display_name"]
         self.device_display_name = pusherdict["device_display_name"]
-        self.pushkey = pusherdict["pushkey"]
         self.pushkey_ts = pusherdict["ts"]
         self.data = pusherdict["data"]
         self.last_stream_ordering = pusherdict["last_stream_ordering"]
@@ -77,13 +77,6 @@ class HttpPusher:
         self._is_processing = False
         self._group_unread_count_by_room = hs.config.push_group_unread_count_by_room
 
-        # This is the highest stream ordering we know it's safe to process.
-        # When new events arrive, we'll be given a window of new events: we
-        # should honour this rather than just looking for anything higher
-        # because of potential out-of-order event serialisation. This starts
-        # off as None though as we don't know any better.
-        self.max_stream_ordering = None
-
         if "data" not in pusherdict:
             raise PusherConfigException("No 'data' key for HTTP pusher")
         self.data = pusherdict["data"]
@@ -97,26 +90,39 @@ class HttpPusher:
         if self.data is None:
             raise PusherConfigException("data can not be null for HTTP pusher")
 
+        # Validate that there's a URL and it is of the proper form.
         if "url" not in self.data:
             raise PusherConfigException("'url' required in data for HTTP pusher")
-        self.url = self.data["url"]
-        self.http_client = hs.get_proxied_http_client()
+
+        url = self.data["url"]
+        if not isinstance(url, str):
+            raise PusherConfigException("'url' must be a string")
+        url_parts = urllib.parse.urlparse(url)
+        # Note that the specification also says the scheme must be HTTPS, but
+        # it isn't up to the homeserver to verify that.
+        if url_parts.path != "/_matrix/push/v1/notify":
+            raise PusherConfigException(
+                "'url' must have a path of '/_matrix/push/v1/notify'"
+            )
+
+        self.url = url
+        self.http_client = hs.get_proxied_blacklisted_http_client()
         self.data_minus_url = {}
         self.data_minus_url.update(self.data)
         del self.data_minus_url["url"]
 
-    def on_started(self, should_check_for_notifs):
+    def on_started(self, should_check_for_notifs: bool) -> None:
         """Called when this pusher has been started.
 
         Args:
-            should_check_for_notifs (bool): Whether we should immediately
+            should_check_for_notifs: Whether we should immediately
                 check for push to send. Set to False only if it's known there
                 is nothing to send
         """
         if should_check_for_notifs:
             self._start_processing()
 
-    def on_new_notifications(self, max_token: RoomStreamToken):
+    def on_new_notifications(self, max_token: RoomStreamToken) -> None:
         # We just use the minimum stream ordering and ignore the vector clock
         # component. This is safe to do as long as we *always* ignore the vector
         # clock components.
@@ -127,14 +133,14 @@ class HttpPusher:
         )
         self._start_processing()
 
-    def on_new_receipts(self, min_stream_id, max_stream_id):
+    def on_new_receipts(self, min_stream_id: int, max_stream_id: int) -> None:
         # Note that the min here shouldn't be relied upon to be accurate.
 
         # We could check the receipts are actually m.read receipts here,
         # but currently that's the only type of receipt anyway...
         run_as_background_process("http_pusher.on_new_receipts", self._update_badge)
 
-    async def _update_badge(self):
+    async def _update_badge(self) -> None:
         # XXX as per https://github.com/matrix-org/matrix-doc/issues/2627, this seems
         # to be largely redundant. perhaps we can remove it.
         badge = await push_tools.get_badge_count(
@@ -144,10 +150,10 @@ class HttpPusher:
         )
         await self._send_badge(badge)
 
-    def on_timer(self):
+    def on_timer(self) -> None:
         self._start_processing()
 
-    def on_stop(self):
+    def on_stop(self) -> None:
         if self.timed_call:
             try:
                 self.timed_call.cancel()
@@ -155,13 +161,13 @@ class HttpPusher:
                 pass
             self.timed_call = None
 
-    def _start_processing(self):
+    def _start_processing(self) -> None:
         if self._is_processing:
             return
 
         run_as_background_process("httppush.process", self._process)
 
-    async def _process(self):
+    async def _process(self) -> None:
         # we should never get here if we are already processing
         assert not self._is_processing
 
@@ -180,7 +186,7 @@ class HttpPusher:
         finally:
             self._is_processing = False
 
-    async def _unsafe_process(self):
+    async def _unsafe_process(self) -> None:
         """
         Looks for unset notifications and dispatch them, in order
         Never call this directly: use _process which will only allow this to
@@ -188,6 +194,7 @@ class HttpPusher:
         """
 
         fn = self.store.get_unread_push_actions_for_user_in_range_for_http
+        assert self.max_stream_ordering is not None
         unprocessed = await fn(
             self.user_id, self.last_stream_ordering, self.max_stream_ordering
         )
@@ -257,17 +264,12 @@ class HttpPusher:
                     )
                     self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
                     self.last_stream_ordering = push_action["stream_ordering"]
-                    pusher_still_exists = await self.store.update_pusher_last_stream_ordering(
+                    await self.store.update_pusher_last_stream_ordering(
                         self.app_id,
                         self.pushkey,
                         self.user_id,
                         self.last_stream_ordering,
                     )
-                    if not pusher_still_exists:
-                        # The pusher has been deleted while we were processing, so
-                        # lets just stop and return.
-                        self.on_stop()
-                        return
 
                     self.failing_since = None
                     await self.store.update_pusher_failing_since(
@@ -283,7 +285,7 @@ class HttpPusher:
                     )
                     break
 
-    async def _process_one(self, push_action):
+    async def _process_one(self, push_action: dict) -> bool:
         if "notify" not in push_action["actions"]:
             return True
 
@@ -314,7 +316,9 @@ class HttpPusher:
                     await self.hs.remove_pusher(self.app_id, pk, self.user_id)
         return True
 
-    async def _build_notification_dict(self, event, tweaks, badge):
+    async def _build_notification_dict(
+        self, event: EventBase, tweaks: Dict[str, bool], badge: int
+    ) -> Dict[str, Any]:
         priority = "low"
         if (
             event.type == EventTypes.Encrypted
@@ -344,9 +348,7 @@ class HttpPusher:
             }
             return d
 
-        ctx = await push_tools.get_context_for_event(
-            self.storage, self.state_handler, event, self.user_id
-        )
+        ctx = await push_tools.get_context_for_event(self.storage, event, self.user_id)
 
         d = {
             "notification": {
@@ -386,7 +388,9 @@ class HttpPusher:
 
         return d
 
-    async def dispatch_push(self, event, tweaks, badge):
+    async def dispatch_push(
+        self, event: EventBase, tweaks: Dict[str, bool], badge: int
+    ) -> Union[bool, Iterable[str]]:
         notification_dict = await self._build_notification_dict(event, tweaks, badge)
         if not notification_dict:
             return []