diff options
Diffstat (limited to 'synapse/push')
-rw-r--r-- | synapse/push/action_generator.py | 6 | ||||
-rw-r--r-- | synapse/push/bulk_push_rule_evaluator.py | 15 | ||||
-rw-r--r-- | synapse/push/httppusher.py | 37 |
3 files changed, 42 insertions, 16 deletions
diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index fe09d50d55..8f619a7a1b 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -40,10 +40,6 @@ class ActionGenerator(object): @defer.inlineCallbacks def handle_push_actions_for_event(self, event, context): with Measure(self.clock, "action_for_event_by_user"): - actions_by_user = yield self.bulk_evaluator.action_for_event_by_user( + yield self.bulk_evaluator.action_for_event_by_user( event, context ) - - context.push_actions = [ - (uid, actions) for uid, actions in actions_by_user.iteritems() - ] diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 425a017bdf..7c680659b6 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -137,11 +137,11 @@ class BulkPushRuleEvaluator(object): @defer.inlineCallbacks def action_for_event_by_user(self, event, context): - """Given an event and context, evaluate the push rules and return - the results + """Given an event and context, evaluate the push rules and insert the + results into the event_push_actions_staging table. Returns: - dict of user_id -> action + Deferred """ rules_by_user = yield self._get_rules_for_event(event, context) actions_by_user = {} @@ -190,9 +190,16 @@ class BulkPushRuleEvaluator(object): if matches: actions = [x for x in rule['actions'] if x != 'dont_notify'] if actions and 'notify' in actions: + # Push rules say we should notify the user of this event actions_by_user[uid] = actions break - defer.returnValue(actions_by_user) + + # Mark in the DB staging area the push actions for users who should be + # notified for this event. (This will then get handled when we persist + # the event) + yield self.store.add_push_actions_to_staging( + event.event_id, actions_by_user, + ) def _condition_checker(evaluator, conditions, uid, display_name, cache): diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index c16f61452c..2cbac571b8 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -13,21 +13,30 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -from synapse.push import PusherConfigException +import logging from twisted.internet import defer, reactor from twisted.internet.error import AlreadyCalled, AlreadyCancelled -import logging import push_rule_evaluator import push_tools - +import synapse +from synapse.push import PusherConfigException from synapse.util.logcontext import LoggingContext from synapse.util.metrics import Measure logger = logging.getLogger(__name__) +metrics = synapse.metrics.get_metrics_for(__name__) + +http_push_processed_counter = metrics.register_counter( + "http_pushes_processed", +) + +http_push_failed_counter = metrics.register_counter( + "http_pushes_failed", +) + class HttpPusher(object): INITIAL_BACKOFF_SEC = 1 # in seconds because that's what Twisted takes @@ -152,9 +161,16 @@ class HttpPusher(object): self.user_id, self.last_stream_ordering, self.max_stream_ordering ) + logger.info( + "Processing %i unprocessed push actions for %s starting at " + "stream_ordering %s", + len(unprocessed), self.name, self.last_stream_ordering, + ) + for push_action in unprocessed: processed = yield self._process_one(push_action) if processed: + http_push_processed_counter.inc() self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC self.last_stream_ordering = push_action['stream_ordering'] yield self.store.update_pusher_last_stream_ordering_and_success( @@ -169,6 +185,7 @@ class HttpPusher(object): self.failing_since ) else: + http_push_failed_counter.inc() if not self.failing_since: self.failing_since = self.clock.time_msec() yield self.store.update_pusher_failing_since( @@ -316,7 +333,10 @@ class HttpPusher(object): try: resp = yield self.http_client.post_json_get_json(self.url, notification_dict) except Exception: - logger.warn("Failed to push %s ", self.url) + logger.warn( + "Failed to push event %s to %s", + event.event_id, self.name, exc_info=True, + ) defer.returnValue(False) rejected = [] if 'rejected' in resp: @@ -325,7 +345,7 @@ class HttpPusher(object): @defer.inlineCallbacks def _send_badge(self, badge): - logger.info("Sending updated badge count %d to %r", badge, self.user_id) + logger.info("Sending updated badge count %d to %s", badge, self.name) d = { 'notification': { 'id': '', @@ -347,7 +367,10 @@ class HttpPusher(object): try: resp = yield self.http_client.post_json_get_json(self.url, d) except Exception: - logger.exception("Failed to push %s ", self.url) + logger.warn( + "Failed to send badge count to %s", + self.name, exc_info=True, + ) defer.returnValue(False) rejected = [] if 'rejected' in resp: |