From 4806651744616bf48abf408034ab9560e33f60ce Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Tue, 23 Jul 2019 23:00:55 +1000 Subject: Replace returnValue with return (#5736) --- synapse/push/httppusher.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) (limited to 'synapse/push/httppusher.py') diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 4e7b6a5531..5b15b0dbe7 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -258,17 +258,17 @@ class HttpPusher(object): @defer.inlineCallbacks def _process_one(self, push_action): if "notify" not in push_action["actions"]: - defer.returnValue(True) + return True tweaks = push_rule_evaluator.tweaks_for_actions(push_action["actions"]) badge = yield push_tools.get_badge_count(self.hs.get_datastore(), self.user_id) event = yield self.store.get_event(push_action["event_id"], allow_none=True) if event is None: - defer.returnValue(True) # It's been redacted + return True # It's been redacted rejected = yield self.dispatch_push(event, tweaks, badge) if rejected is False: - defer.returnValue(False) + return False if isinstance(rejected, list) or isinstance(rejected, tuple): for pk in rejected: @@ -282,7 +282,7 @@ class HttpPusher(object): else: logger.info("Pushkey %s was rejected: removing", pk) yield self.hs.remove_pusher(self.app_id, pk, self.user_id) - defer.returnValue(True) + return True @defer.inlineCallbacks def _build_notification_dict(self, event, tweaks, badge): @@ -302,7 +302,7 @@ class HttpPusher(object): ], } } - defer.returnValue(d) + return d ctx = yield push_tools.get_context_for_event( self.store, self.state_handler, event, self.user_id @@ -345,13 +345,13 @@ class HttpPusher(object): if "name" in ctx and len(ctx["name"]) > 0: d["notification"]["room_name"] = ctx["name"] - defer.returnValue(d) + return d @defer.inlineCallbacks def dispatch_push(self, event, tweaks, badge): notification_dict = yield self._build_notification_dict(event, tweaks, badge) if not notification_dict: - defer.returnValue([]) + return [] try: resp = yield self.http_client.post_json_get_json( self.url, notification_dict @@ -364,11 +364,11 @@ class HttpPusher(object): type(e), e, ) - defer.returnValue(False) + return False rejected = [] if "rejected" in resp: rejected = resp["rejected"] - defer.returnValue(rejected) + return rejected @defer.inlineCallbacks def _send_badge(self, badge): -- cgit 1.5.1 From d02e41dcb299c7588bc9fa26bd0b5321fd7c5751 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Aug 2019 13:41:27 +0100 Subject: Handle pusher being deleted during processing. Instead of throwing a StoreError lets break out of processing loop and mark the pusher as stopped. --- synapse/push/emailpusher.py | 19 +++++++++++++------ synapse/push/httppusher.py | 27 ++++++++++++++++++++------- synapse/storage/pusher.py | 30 ++++++++++++++++++++++-------- 3 files changed, 55 insertions(+), 21 deletions(-) (limited to 'synapse/push/httppusher.py') diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index 424ffa8b68..f688d4152d 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -234,13 +234,20 @@ class EmailPusher(object): return self.last_stream_ordering = last_stream_ordering - yield self.store.update_pusher_last_stream_ordering_and_success( - self.app_id, - self.email, - self.user_id, - last_stream_ordering, - self.clock.time_msec(), + pusher_still_exists = ( + yield self.store.update_pusher_last_stream_ordering_and_success( + self.app_id, + self.email, + self.user_id, + last_stream_ordering, + self.clock.time_msec(), + ) ) + if not pusher_still_exists: + # The pusher has been deleted while we were processing, so + # lets just stop and return. + self.on_stop() + return def seconds_until(self, ts_msec): secs = (ts_msec - self.clock.time_msec()) / 1000 diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 5b15b0dbe7..bd5d53af91 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -199,13 +199,21 @@ class HttpPusher(object): http_push_processed_counter.inc() self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC self.last_stream_ordering = push_action["stream_ordering"] - yield self.store.update_pusher_last_stream_ordering_and_success( - self.app_id, - self.pushkey, - self.user_id, - self.last_stream_ordering, - self.clock.time_msec(), + pusher_still_exists = ( + yield self.store.update_pusher_last_stream_ordering_and_success( + self.app_id, + self.pushkey, + self.user_id, + self.last_stream_ordering, + self.clock.time_msec(), + ) ) + if not pusher_still_exists: + # The pusher has been deleted while we were processing, so + # lets just stop and return. + self.on_stop() + return + if self.failing_since: self.failing_since = None yield self.store.update_pusher_failing_since( @@ -234,12 +242,17 @@ class HttpPusher(object): ) self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC self.last_stream_ordering = push_action["stream_ordering"] - yield self.store.update_pusher_last_stream_ordering( + pusher_still_exists = yield self.store.update_pusher_last_stream_ordering( self.app_id, self.pushkey, self.user_id, self.last_stream_ordering, ) + if not pusher_still_exists: + # The pusher has been deleted while we were processing, so + # lets just stop and return. + self.on_stop() + return self.failing_since = None yield self.store.update_pusher_failing_since( diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index be3d4d9ded..888035fe86 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -308,22 +308,36 @@ class PusherStore(PusherWorkerStore): def update_pusher_last_stream_ordering_and_success( self, app_id, pushkey, user_id, last_stream_ordering, last_success ): - yield self._simple_update_one( - "pushers", - {"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, - { + """Update the last stream ordering position we've processed up to for + the given pusher. + + Args: + app_id (str) + pushkey (str) + last_stream_ordering (int) + last_success (int) + + Returns: + Deferred[bool]: Whether the pusher stil exists or not. + """ + updated = yield self._simple_update( + table="pushers", + keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, + updatevalues={ "last_stream_ordering": last_stream_ordering, "last_success": last_success, }, desc="update_pusher_last_stream_ordering_and_success", ) + return bool(updated) + @defer.inlineCallbacks def update_pusher_failing_since(self, app_id, pushkey, user_id, failing_since): - yield self._simple_update_one( - "pushers", - {"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, - {"failing_since": failing_since}, + yield self._simple_update( + table="pushers", + keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, + updatevalues={"failing_since": failing_since}, desc="update_pusher_failing_since", ) -- cgit 1.5.1