diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py
index eeafea74d1..90eff030b5 100644
--- a/synapse/module_api/__init__.py
+++ b/synapse/module_api/__init__.py
@@ -105,6 +105,7 @@ from synapse.module_api.callbacks.spamchecker_callbacks import (
USER_MAY_SEND_3PID_INVITE_CALLBACK,
SpamCheckerModuleApiCallbacks,
)
+from synapse.push.httppusher import HttpPusher
from synapse.rest.client.login import LoginResponse
from synapse.storage import DataStore
from synapse.storage.background_updates import (
@@ -248,6 +249,7 @@ class ModuleApi:
self._registration_handler = hs.get_registration_handler()
self._send_email_handler = hs.get_send_email_handler()
self._push_rules_handler = hs.get_push_rules_handler()
+ self._pusherpool = hs.get_pusherpool()
self._device_handler = hs.get_device_handler()
self.custom_template_dir = hs.config.server.custom_template_directory
self._callbacks = hs.get_module_api_callbacks()
@@ -1225,6 +1227,50 @@ class ModuleApi:
await self._clock.sleep(seconds)
+ async def send_http_push_notification(
+ self,
+ user_id: str,
+ device_id: Optional[str],
+ content: JsonDict,
+ tweaks: Optional[JsonMapping] = None,
+ default_payload: Optional[JsonMapping] = None,
+ ) -> Dict[str, bool]:
+ """Send an HTTP push notification that is forwarded to the registered push gateway
+ for the specified user/device.
+
+ Added in Synapse v1.82.0.
+
+ Args:
+ user_id: The user ID to send the push notification to.
+ device_id: The device ID of the device where to send the push notification. If `None`,
+ the notification will be sent to all registered HTTP pushers of the user.
+ content: A dict of values that will be put in the `notification` field of the push
+ (cf Push Gateway spec). `devices` field will be overrided if included.
+ tweaks: A dict of `tweaks` that will be inserted in the `devices` section, cf spec.
+ default_payload: default payload to add in `devices[0].data.default_payload`.
+ This will be merged (and override if some matching values already exist there)
+ with existing `default_payload`.
+
+ Returns:
+ a dict reprensenting the status of the push per device ID
+ """
+ status = {}
+ if user_id in self._pusherpool.pushers:
+ for p in self._pusherpool.pushers[user_id].values():
+ if isinstance(p, HttpPusher) and (
+ not device_id or p.device_id == device_id
+ ):
+ res = await p.dispatch_push(content, tweaks, default_payload)
+ # Check if the push was successful and no pushers were rejected.
+ sent = res is not False and not res
+
+ # This is mainly to accomodate mypy
+ # device_id should never be empty after the `set_device_id_for_pushers`
+ # background job has been properly run.
+ if p.device_id:
+ status[p.device_id] = sent
+ return status
+
async def send_mail(
self,
recipient: str,
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index b048b03a74..4f8fa445d9 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -14,7 +14,7 @@
# limitations under the License.
import logging
import urllib.parse
-from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Union
+from typing import TYPE_CHECKING, Dict, List, Optional, Union
from prometheus_client import Counter
@@ -27,6 +27,7 @@ from synapse.logging import opentracing
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push import Pusher, PusherConfig, PusherConfigException
from synapse.storage.databases.main.event_push_actions import HttpPushAction
+from synapse.types import JsonDict, JsonMapping
from . import push_tools
@@ -56,7 +57,7 @@ http_badges_failed_counter = Counter(
)
-def tweaks_for_actions(actions: List[Union[str, Dict]]) -> Dict[str, Any]:
+def tweaks_for_actions(actions: List[Union[str, Dict]]) -> JsonMapping:
"""
Converts a list of actions into a `tweaks` dict (which can then be passed to
the push gateway).
@@ -101,6 +102,7 @@ class HttpPusher(Pusher):
self._storage_controllers = self.hs.get_storage_controllers()
self.app_display_name = pusher_config.app_display_name
self.device_display_name = pusher_config.device_display_name
+ self.device_id = pusher_config.device_id
self.pushkey_ts = pusher_config.ts
self.data = pusher_config.data
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
@@ -324,7 +326,7 @@ class HttpPusher(Pusher):
event = await self.store.get_event(push_action.event_id, allow_none=True)
if event is None:
return True # It's been redacted
- rejected = await self.dispatch_push(event, tweaks, badge)
+ rejected = await self.dispatch_push_event(event, tweaks, badge)
if rejected is False:
return False
@@ -342,9 +344,83 @@ class HttpPusher(Pusher):
await self._pusherpool.remove_pusher(self.app_id, pk, self.user_id)
return True
- async def _build_notification_dict(
- self, event: EventBase, tweaks: Dict[str, bool], badge: int
- ) -> Dict[str, Any]:
+ async def dispatch_push(
+ self,
+ content: JsonDict,
+ tweaks: Optional[JsonMapping] = None,
+ default_payload: Optional[JsonMapping] = None,
+ ) -> Union[bool, List[str]]:
+ """Send a notification to the registered push gateway, with `content` being
+ the content of the `notification` top property specified in the spec.
+ Note that the `devices` property will be added with device-specific
+ information for this pusher.
+
+ Args:
+ content: the content
+ tweaks: tweaks to add into the `devices` section
+ default_payload: default payload to add in `devices[0].data.default_payload`.
+ This will be merged (and override if some matching values already exist there)
+ with existing `default_payload`.
+
+ Returns:
+ False if an error occured when calling the push gateway, or an array of
+ rejected push keys otherwise. If this array is empty, the push fully
+ succeeded.
+ """
+ content = content.copy()
+
+ data = self.data_minus_url.copy()
+ if default_payload:
+ data.setdefault("default_payload", {}).update(default_payload)
+
+ device = {
+ "app_id": self.app_id,
+ "pushkey": self.pushkey,
+ "pushkey_ts": int(self.pushkey_ts / 1000),
+ "data": data,
+ }
+ if tweaks:
+ device["tweaks"] = tweaks
+
+ content["devices"] = [device]
+
+ try:
+ resp = await self.http_client.post_json_get_json(
+ self.url, {"notification": content}
+ )
+ except Exception as e:
+ logger.warning(
+ "Failed to push data to %s: %s %s",
+ self.name,
+ type(e),
+ e,
+ )
+ return False
+ rejected = []
+ if "rejected" in resp:
+ rejected = resp["rejected"]
+ return rejected
+
+ async def dispatch_push_event(
+ self,
+ event: EventBase,
+ tweaks: JsonMapping,
+ badge: int,
+ ) -> Union[bool, List[str]]:
+ """Send a notification to the registered push gateway by building it
+ from an event.
+
+ Args:
+ event: the event
+ tweaks: tweaks to add into the `devices` section, used to decide the
+ push priority
+ badge: unread count to send with the push notification
+
+ Returns:
+ False if an error occured when calling the push gateway, or an array of
+ rejected push keys otherwise. If this array is empty, the push fully
+ succeeded.
+ """
priority = "low"
if (
event.type == EventTypes.Encrypted
@@ -358,30 +434,20 @@ class HttpPusher(Pusher):
# This was checked in the __init__, but mypy doesn't seem to know that.
assert self.data is not None
if self.data.get("format") == "event_id_only":
- d: Dict[str, Any] = {
- "notification": {
- "event_id": event.event_id,
- "room_id": event.room_id,
- "counts": {"unread": badge},
- "prio": priority,
- "devices": [
- {
- "app_id": self.app_id,
- "pushkey": self.pushkey,
- "pushkey_ts": int(self.pushkey_ts / 1000),
- "data": self.data_minus_url,
- }
- ],
- }
+ content: JsonDict = {
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "counts": {"unread": badge},
+ "prio": priority,
}
- return d
-
- ctx = await push_tools.get_context_for_event(
- self._storage_controllers, event, self.user_id
- )
+ # event_id_only doesn't include the tweaks, so override them.
+ tweaks = {}
+ else:
+ ctx = await push_tools.get_context_for_event(
+ self._storage_controllers, event, self.user_id
+ )
- d = {
- "notification": {
+ content = {
"id": event.event_id, # deprecated: remove soon
"event_id": event.event_id,
"room_id": event.room_id,
@@ -392,57 +458,27 @@ class HttpPusher(Pusher):
"unread": badge,
# 'missed_calls': 2
},
- "devices": [
- {
- "app_id": self.app_id,
- "pushkey": self.pushkey,
- "pushkey_ts": int(self.pushkey_ts / 1000),
- "data": self.data_minus_url,
- "tweaks": tweaks,
- }
- ],
}
- }
- if event.type == "m.room.member" and event.is_state():
- d["notification"]["membership"] = event.content["membership"]
- d["notification"]["user_is_target"] = event.state_key == self.user_id
- if self.hs.config.push.push_include_content and event.content:
- d["notification"]["content"] = event.content
-
- # We no longer send aliases separately, instead, we send the human
- # readable name of the room, which may be an alias.
- if "sender_display_name" in ctx and len(ctx["sender_display_name"]) > 0:
- d["notification"]["sender_display_name"] = ctx["sender_display_name"]
- if "name" in ctx and len(ctx["name"]) > 0:
- d["notification"]["room_name"] = ctx["name"]
-
- return d
-
- async def dispatch_push(
- self, event: EventBase, tweaks: Dict[str, bool], badge: int
- ) -> Union[bool, Iterable[str]]:
- notification_dict = await self._build_notification_dict(event, tweaks, badge)
- if not notification_dict:
- return []
- try:
- resp = await self.http_client.post_json_get_json(
- self.url, notification_dict
- )
- except Exception as e:
- logger.warning(
- "Failed to push event %s to %s: %s %s",
- event.event_id,
- self.name,
- type(e),
- e,
- )
- return False
- rejected = []
- if "rejected" in resp:
- rejected = resp["rejected"]
- if not rejected:
+ if event.type == "m.room.member" and event.is_state():
+ content["membership"] = event.content["membership"]
+ content["user_is_target"] = event.state_key == self.user_id
+ if self.hs.config.push.push_include_content and event.content:
+ content["content"] = event.content
+
+ # We no longer send aliases separately, instead, we send the human
+ # readable name of the room, which may be an alias.
+ if "sender_display_name" in ctx and len(ctx["sender_display_name"]) > 0:
+ content["sender_display_name"] = ctx["sender_display_name"]
+ if "name" in ctx and len(ctx["name"]) > 0:
+ content["room_name"] = ctx["name"]
+
+ res = await self.dispatch_push(content, tweaks)
+
+ # If the push is successful and none are rejected, update the badge count.
+ if res is not False and not res:
self.badge_count_last_call = badge
- return rejected
+
+ return res
async def _send_badge(self, badge: int) -> None:
"""
|