summary refs log tree commit diff
path: root/synapse/push/httppusher.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2019-06-20 11:59:14 +0100
committerErik Johnston <erik@matrix.org>2019-06-20 11:59:14 +0100
commit45f28a9d2fc0466dcf2a05b0063b7caa3b7e12c3 (patch)
tree07bb21377c6611db89f64f948a2e27645662ff0e /synapse/push/httppusher.py
parentAdd descriptions and remove redundant set(..) (diff)
parentRun Black. (#5482) (diff)
downloadsynapse-45f28a9d2fc0466dcf2a05b0063b7caa3b7e12c3.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into erikj/histogram_extremities
Diffstat (limited to 'synapse/push/httppusher.py')
-rw-r--r--synapse/push/httppusher.py226
1 files changed, 110 insertions, 116 deletions
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index fac05aa44c..4e7b6a5531 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -65,16 +65,16 @@ class HttpPusher(object):
         self.store = self.hs.get_datastore()
         self.clock = self.hs.get_clock()
         self.state_handler = self.hs.get_state_handler()
-        self.user_id = pusherdict['user_name']
-        self.app_id = pusherdict['app_id']
-        self.app_display_name = pusherdict['app_display_name']
-        self.device_display_name = pusherdict['device_display_name']
-        self.pushkey = pusherdict['pushkey']
-        self.pushkey_ts = pusherdict['ts']
-        self.data = pusherdict['data']
-        self.last_stream_ordering = pusherdict['last_stream_ordering']
+        self.user_id = pusherdict["user_name"]
+        self.app_id = pusherdict["app_id"]
+        self.app_display_name = pusherdict["app_display_name"]
+        self.device_display_name = pusherdict["device_display_name"]
+        self.pushkey = pusherdict["pushkey"]
+        self.pushkey_ts = pusherdict["ts"]
+        self.data = pusherdict["data"]
+        self.last_stream_ordering = pusherdict["last_stream_ordering"]
         self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
-        self.failing_since = pusherdict['failing_since']
+        self.failing_since = pusherdict["failing_since"]
         self.timed_call = None
         self._is_processing = False
 
@@ -85,32 +85,26 @@ class HttpPusher(object):
         # off as None though as we don't know any better.
         self.max_stream_ordering = None
 
-        if 'data' not in pusherdict:
-            raise PusherConfigException(
-                "No 'data' key for HTTP pusher"
-            )
-        self.data = pusherdict['data']
+        if "data" not in pusherdict:
+            raise PusherConfigException("No 'data' key for HTTP pusher")
+        self.data = pusherdict["data"]
 
         self.name = "%s/%s/%s" % (
-            pusherdict['user_name'],
-            pusherdict['app_id'],
-            pusherdict['pushkey'],
+            pusherdict["user_name"],
+            pusherdict["app_id"],
+            pusherdict["pushkey"],
         )
 
         if self.data is None:
-            raise PusherConfigException(
-                "data can not be null for HTTP pusher"
-            )
+            raise PusherConfigException("data can not be null for HTTP pusher")
 
-        if 'url' not in self.data:
-            raise PusherConfigException(
-                "'url' required in data for HTTP pusher"
-            )
-        self.url = self.data['url']
+        if "url" not in self.data:
+            raise PusherConfigException("'url' required in data for HTTP pusher")
+        self.url = self.data["url"]
         self.http_client = hs.get_simple_http_client()
         self.data_minus_url = {}
         self.data_minus_url.update(self.data)
-        del self.data_minus_url['url']
+        del self.data_minus_url["url"]
 
     def on_started(self, should_check_for_notifs):
         """Called when this pusher has been started.
@@ -124,7 +118,9 @@ class HttpPusher(object):
             self._start_processing()
 
     def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
-        self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering or 0)
+        self.max_stream_ordering = max(
+            max_stream_ordering, self.max_stream_ordering or 0
+        )
         self._start_processing()
 
     def on_new_receipts(self, min_stream_id, max_stream_id):
@@ -192,7 +188,9 @@ class HttpPusher(object):
         logger.info(
             "Processing %i unprocessed push actions for %s starting at "
             "stream_ordering %s",
-            len(unprocessed), self.name, self.last_stream_ordering,
+            len(unprocessed),
+            self.name,
+            self.last_stream_ordering,
         )
 
         for push_action in unprocessed:
@@ -200,71 +198,72 @@ class HttpPusher(object):
             if processed:
                 http_push_processed_counter.inc()
                 self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
-                self.last_stream_ordering = push_action['stream_ordering']
+                self.last_stream_ordering = push_action["stream_ordering"]
                 yield self.store.update_pusher_last_stream_ordering_and_success(
-                    self.app_id, self.pushkey, self.user_id,
+                    self.app_id,
+                    self.pushkey,
+                    self.user_id,
                     self.last_stream_ordering,
-                    self.clock.time_msec()
+                    self.clock.time_msec(),
                 )
                 if self.failing_since:
                     self.failing_since = None
                     yield self.store.update_pusher_failing_since(
-                        self.app_id, self.pushkey, self.user_id,
-                        self.failing_since
+                        self.app_id, self.pushkey, self.user_id, self.failing_since
                     )
             else:
                 http_push_failed_counter.inc()
                 if not self.failing_since:
                     self.failing_since = self.clock.time_msec()
                     yield self.store.update_pusher_failing_since(
-                        self.app_id, self.pushkey, self.user_id,
-                        self.failing_since
+                        self.app_id, self.pushkey, self.user_id, self.failing_since
                     )
 
                 if (
-                    self.failing_since and
-                    self.failing_since <
-                    self.clock.time_msec() - HttpPusher.GIVE_UP_AFTER_MS
+                    self.failing_since
+                    and self.failing_since
+                    < self.clock.time_msec() - HttpPusher.GIVE_UP_AFTER_MS
                 ):
                     # we really only give up so that if the URL gets
                     # fixed, we don't suddenly deliver a load
                     # of old notifications.
-                    logger.warn("Giving up on a notification to user %s, "
-                                "pushkey %s",
-                                self.user_id, self.pushkey)
+                    logger.warn(
+                        "Giving up on a notification to user %s, " "pushkey %s",
+                        self.user_id,
+                        self.pushkey,
+                    )
                     self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
-                    self.last_stream_ordering = push_action['stream_ordering']
+                    self.last_stream_ordering = push_action["stream_ordering"]
                     yield self.store.update_pusher_last_stream_ordering(
                         self.app_id,
                         self.pushkey,
                         self.user_id,
-                        self.last_stream_ordering
+                        self.last_stream_ordering,
                     )
 
                     self.failing_since = None
                     yield self.store.update_pusher_failing_since(
-                        self.app_id,
-                        self.pushkey,
-                        self.user_id,
-                        self.failing_since
+                        self.app_id, self.pushkey, self.user_id, self.failing_since
                     )
                 else:
                     logger.info("Push failed: delaying for %ds", self.backoff_delay)
                     self.timed_call = self.hs.get_reactor().callLater(
                         self.backoff_delay, self.on_timer
                     )
-                    self.backoff_delay = min(self.backoff_delay * 2, self.MAX_BACKOFF_SEC)
+                    self.backoff_delay = min(
+                        self.backoff_delay * 2, self.MAX_BACKOFF_SEC
+                    )
                     break
 
     @defer.inlineCallbacks
     def _process_one(self, push_action):
-        if 'notify' not in push_action['actions']:
+        if "notify" not in push_action["actions"]:
             defer.returnValue(True)
 
-        tweaks = push_rule_evaluator.tweaks_for_actions(push_action['actions'])
+        tweaks = push_rule_evaluator.tweaks_for_actions(push_action["actions"])
         badge = yield push_tools.get_badge_count(self.hs.get_datastore(), self.user_id)
 
-        event = yield self.store.get_event(push_action['event_id'], allow_none=True)
+        event = yield self.store.get_event(push_action["event_id"], allow_none=True)
         if event is None:
             defer.returnValue(True)  # It's been redacted
         rejected = yield self.dispatch_push(event, tweaks, badge)
@@ -277,37 +276,30 @@ class HttpPusher(object):
                     # for sanity, we only remove the pushkey if it
                     # was the one we actually sent...
                     logger.warn(
-                        ("Ignoring rejected pushkey %s because we"
-                         " didn't send it"), pk
+                        ("Ignoring rejected pushkey %s because we" " didn't send it"),
+                        pk,
                     )
                 else:
-                    logger.info(
-                        "Pushkey %s was rejected: removing",
-                        pk
-                    )
-                    yield self.hs.remove_pusher(
-                        self.app_id, pk, self.user_id
-                    )
+                    logger.info("Pushkey %s was rejected: removing", pk)
+                    yield self.hs.remove_pusher(self.app_id, pk, self.user_id)
         defer.returnValue(True)
 
     @defer.inlineCallbacks
     def _build_notification_dict(self, event, tweaks, badge):
-        if self.data.get('format') == 'event_id_only':
+        if self.data.get("format") == "event_id_only":
             d = {
-                'notification': {
-                    'event_id': event.event_id,
-                    'room_id': event.room_id,
-                    'counts': {
-                        'unread': badge,
-                    },
-                    'devices': [
+                "notification": {
+                    "event_id": event.event_id,
+                    "room_id": event.room_id,
+                    "counts": {"unread": badge},
+                    "devices": [
                         {
-                            'app_id': self.app_id,
-                            'pushkey': self.pushkey,
-                            'pushkey_ts': long(self.pushkey_ts / 1000),
-                            'data': self.data_minus_url,
+                            "app_id": self.app_id,
+                            "pushkey": self.pushkey,
+                            "pushkey_ts": long(self.pushkey_ts / 1000),
+                            "data": self.data_minus_url,
                         }
-                    ]
+                    ],
                 }
             }
             defer.returnValue(d)
@@ -317,41 +309,41 @@ class HttpPusher(object):
         )
 
         d = {
-            'notification': {
-                'id': event.event_id,  # deprecated: remove soon
-                'event_id': event.event_id,
-                'room_id': event.room_id,
-                'type': event.type,
-                'sender': event.user_id,
-                'counts': {  # -- we don't mark messages as read yet so
-                             # we have no way of knowing
+            "notification": {
+                "id": event.event_id,  # deprecated: remove soon
+                "event_id": event.event_id,
+                "room_id": event.room_id,
+                "type": event.type,
+                "sender": event.user_id,
+                "counts": {  # -- we don't mark messages as read yet so
+                    # we have no way of knowing
                     # Just set the badge to 1 until we have read receipts
-                    'unread': badge,
+                    "unread": badge,
                     # 'missed_calls': 2
                 },
-                'devices': [
+                "devices": [
                     {
-                        'app_id': self.app_id,
-                        'pushkey': self.pushkey,
-                        'pushkey_ts': long(self.pushkey_ts / 1000),
-                        'data': self.data_minus_url,
-                        'tweaks': tweaks
+                        "app_id": self.app_id,
+                        "pushkey": self.pushkey,
+                        "pushkey_ts": long(self.pushkey_ts / 1000),
+                        "data": self.data_minus_url,
+                        "tweaks": tweaks,
                     }
-                ]
+                ],
             }
         }
-        if event.type == 'm.room.member' and event.is_state():
-            d['notification']['membership'] = event.content['membership']
-            d['notification']['user_is_target'] = event.state_key == self.user_id
+        if event.type == "m.room.member" and event.is_state():
+            d["notification"]["membership"] = event.content["membership"]
+            d["notification"]["user_is_target"] = event.state_key == self.user_id
         if self.hs.config.push_include_content and event.content:
-            d['notification']['content'] = event.content
+            d["notification"]["content"] = event.content
 
         # We no longer send aliases separately, instead, we send the human
         # readable name of the room, which may be an alias.
-        if 'sender_display_name' in ctx and len(ctx['sender_display_name']) > 0:
-            d['notification']['sender_display_name'] = ctx['sender_display_name']
-        if 'name' in ctx and len(ctx['name']) > 0:
-            d['notification']['room_name'] = ctx['name']
+        if "sender_display_name" in ctx and len(ctx["sender_display_name"]) > 0:
+            d["notification"]["sender_display_name"] = ctx["sender_display_name"]
+        if "name" in ctx and len(ctx["name"]) > 0:
+            d["notification"]["room_name"] = ctx["name"]
 
         defer.returnValue(d)
 
@@ -361,16 +353,21 @@ class HttpPusher(object):
         if not notification_dict:
             defer.returnValue([])
         try:
-            resp = yield self.http_client.post_json_get_json(self.url, notification_dict)
+            resp = yield self.http_client.post_json_get_json(
+                self.url, notification_dict
+            )
         except Exception as e:
             logger.warning(
                 "Failed to push event %s to %s: %s %s",
-                event.event_id, self.name, type(e), e,
+                event.event_id,
+                self.name,
+                type(e),
+                e,
             )
             defer.returnValue(False)
         rejected = []
-        if 'rejected' in resp:
-            rejected = resp['rejected']
+        if "rejected" in resp:
+            rejected = resp["rejected"]
         defer.returnValue(rejected)
 
     @defer.inlineCallbacks
@@ -381,21 +378,19 @@ class HttpPusher(object):
         """
         logger.info("Sending updated badge count %d to %s", badge, self.name)
         d = {
-            'notification': {
-                'id': '',
-                'type': None,
-                'sender': '',
-                'counts': {
-                    'unread': badge
-                },
-                'devices': [
+            "notification": {
+                "id": "",
+                "type": None,
+                "sender": "",
+                "counts": {"unread": badge},
+                "devices": [
                     {
-                        'app_id': self.app_id,
-                        'pushkey': self.pushkey,
-                        'pushkey_ts': long(self.pushkey_ts / 1000),
-                        'data': self.data_minus_url,
+                        "app_id": self.app_id,
+                        "pushkey": self.pushkey,
+                        "pushkey_ts": long(self.pushkey_ts / 1000),
+                        "data": self.data_minus_url,
                     }
-                ]
+                ],
             }
         }
         try:
@@ -403,7 +398,6 @@ class HttpPusher(object):
             http_badges_processed_counter.inc()
         except Exception as e:
             logger.warning(
-                "Failed to send badge count to %s: %s %s",
-                self.name, type(e), e,
+                "Failed to send badge count to %s: %s %s", self.name, type(e), e
             )
             http_badges_failed_counter.inc()