From 93efd7eb04601ee103f176819283e0298c660adc Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 22 Jan 2018 18:14:10 +0000 Subject: logging and debug for http pusher --- synapse/push/httppusher.py | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) (limited to 'synapse/push') diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index c16f61452c..f2517f39a8 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -13,21 +13,30 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -from synapse.push import PusherConfigException +import logging from twisted.internet import defer, reactor from twisted.internet.error import AlreadyCalled, AlreadyCancelled -import logging import push_rule_evaluator import push_tools - +import synapse +from synapse.push import PusherConfigException from synapse.util.logcontext import LoggingContext from synapse.util.metrics import Measure logger = logging.getLogger(__name__) +metrics = synapse.metrics.get_metrics_for(__name__) + +http_push_processed_counter = metrics.register_counter( + "http_pushes_processed", +) + +http_push_failed_counter = metrics.register_counter( + "http_pushes_failed", +) + class HttpPusher(object): INITIAL_BACKOFF_SEC = 1 # in seconds because that's what Twisted takes @@ -152,9 +161,15 @@ class HttpPusher(object): self.user_id, self.last_stream_ordering, self.max_stream_ordering ) + logger.info( + "Processing %i unprocessed push actions starting at stream_ordering %i", + len(unprocessed), self.last_stream_ordering, + ) + for push_action in unprocessed: processed = yield self._process_one(push_action) if processed: + http_push_processed_counter.inc() self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC self.last_stream_ordering = push_action['stream_ordering'] yield self.store.update_pusher_last_stream_ordering_and_success( @@ -169,6 +184,7 @@ class HttpPusher(object): self.failing_since ) else: + http_push_failed_counter.inc() if not self.failing_since: self.failing_since = self.clock.time_msec() yield self.store.update_pusher_failing_since( -- cgit 1.4.1 From 4528dd2443f4dc9e737bf4eeccedfb8807a1ea2c Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 22 Jan 2018 20:15:42 +0000 Subject: Fix logging and add user_id --- synapse/push/httppusher.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'synapse/push') diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index f2517f39a8..4a03af5b21 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -162,8 +162,9 @@ class HttpPusher(object): ) logger.info( - "Processing %i unprocessed push actions starting at stream_ordering %i", - len(unprocessed), self.last_stream_ordering, + "Processing %i unprocessed push actions for %s starting at " + "stream_ordering %s", + len(unprocessed), self.user_id, self.last_stream_ordering, ) for push_action in unprocessed: -- cgit 1.4.1 From e051abd20b1978ddc53723c8233bc54742243045 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 24 Jan 2018 21:06:54 +0000 Subject: add appid/device_display_name to to pusher logging --- synapse/push/httppusher.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/push') diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 4a03af5b21..02bd013ca3 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -164,7 +164,7 @@ class HttpPusher(object): logger.info( "Processing %i unprocessed push actions for %s starting at " "stream_ordering %s", - len(unprocessed), self.user_id, self.last_stream_ordering, + len(unprocessed), self.name, self.last_stream_ordering, ) for push_action in unprocessed: @@ -342,7 +342,7 @@ class HttpPusher(object): @defer.inlineCallbacks def _send_badge(self, badge): - logger.info("Sending updated badge count %d to %r", badge, self.user_id) + logger.info("Sending updated badge count %d to %s", badge, self.name) d = { 'notification': { 'id': '', -- cgit 1.4.1 From 03dd745fe28a00c8788a2147d4f5c2a852182429 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 29 Jan 2018 15:49:06 +0000 Subject: Better logging when pushes fail --- synapse/push/httppusher.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) (limited to 'synapse/push') diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 02bd013ca3..2cbac571b8 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -333,7 +333,10 @@ class HttpPusher(object): try: resp = yield self.http_client.post_json_get_json(self.url, notification_dict) except Exception: - logger.warn("Failed to push %s ", self.url) + logger.warn( + "Failed to push event %s to %s", + event.event_id, self.name, exc_info=True, + ) defer.returnValue(False) rejected = [] if 'rejected' in resp: @@ -364,7 +367,10 @@ class HttpPusher(object): try: resp = yield self.http_client.post_json_get_json(self.url, d) except Exception: - logger.exception("Failed to push %s ", self.url) + logger.warn( + "Failed to send badge count to %s", + self.name, exc_info=True, + ) defer.returnValue(False) rejected = [] if 'rejected' in resp: -- cgit 1.4.1 From acac21248cf1834233831383ee52198ca1bd010c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Feb 2018 15:01:12 +0000 Subject: Store push actions in staging area --- synapse/push/bulk_push_rule_evaluator.py | 3 +++ synapse/storage/event_push_actions.py | 27 ++++++++++++++++++++++ .../schema/delta/47/push_actions_staging.sql | 24 +++++++++++++++++++ 3 files changed, 54 insertions(+) create mode 100644 synapse/storage/schema/delta/47/push_actions_staging.sql (limited to 'synapse/push') diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 425a017bdf..841ccbd1f1 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -191,6 +191,9 @@ class BulkPushRuleEvaluator(object): actions = [x for x in rule['actions'] if x != 'dont_notify'] if actions and 'notify' in actions: actions_by_user[uid] = actions + yield self.store.add_push_actions_to_staging( + event.event_id, uid, actions, + ) break defer.returnValue(actions_by_user) diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 8efe2fd4bb..80c3cfe95f 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -738,6 +738,33 @@ class EventPushActionsStore(SQLBaseStore): (rotate_to_stream_ordering,) ) + def add_push_actions_to_staging(self, event_id, user_id, actions): + """Add the push actions for the user and event to the push + action staging area. + + Args: + event_id (str) + user_id (str) + actions (list) + + Returns: + Deferred + """ + + is_highlight = _action_has_highlight(actions) + + return self._simple_insert( + table="event_push_actions_staging", + values={ + "event_id": event_id, + "user_id": user_id, + "actions": _serialize_action(actions, is_highlight), + "notif": True, + "highlight": is_highlight, + }, + desc="add_push_actions_to_staging", + ) + def _action_has_highlight(actions): for action in actions: diff --git a/synapse/storage/schema/delta/47/push_actions_staging.sql b/synapse/storage/schema/delta/47/push_actions_staging.sql new file mode 100644 index 0000000000..ec4b1d7d42 --- /dev/null +++ b/synapse/storage/schema/delta/47/push_actions_staging.sql @@ -0,0 +1,24 @@ +/* Copyright 2018 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE event_push_actions_staging ( + event_id TEXT NOT NULL, + user_id TEXT NOT NULL, + actions TEXT NOT NULL, + notif SMALLINT NOT NULL, + highlight SMALLINT NOT NULL +); + +CREATE INDEX event_push_actions_staging_id ON event_push_actions_staging(event_id); -- cgit 1.4.1 From 4810f7effd0fc3fd97f9edaf8ea0af48477adb0a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Feb 2018 15:18:37 +0000 Subject: Remove context.push_actions --- synapse/events/snapshot.py | 4 ---- synapse/push/action_generator.py | 6 +----- synapse/push/bulk_push_rule_evaluator.py | 9 +++------ synapse/storage/events.py | 7 +++---- tests/replication/slave/storage/test_events.py | 5 ++++- 5 files changed, 11 insertions(+), 20 deletions(-) (limited to 'synapse/push') diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index f9445bef13..8e684d91b5 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -52,7 +52,6 @@ class EventContext(object): "prev_state_ids", "state_group", "rejected", - "push_actions", "prev_group", "delta_ids", "prev_state_events", @@ -67,7 +66,6 @@ class EventContext(object): self.state_group = None self.rejected = False - self.push_actions = [] # A previously persisted state group and a delta between that # and this state. @@ -104,7 +102,6 @@ class EventContext(object): "event_state_key": event.state_key if event.is_state() else None, "state_group": self.state_group, "rejected": self.rejected, - "push_actions": self.push_actions, "prev_group": self.prev_group, "delta_ids": _encode_state_dict(self.delta_ids), "prev_state_events": self.prev_state_events, @@ -127,7 +124,6 @@ class EventContext(object): context = EventContext() context.state_group = input["state_group"] context.rejected = input["rejected"] - context.push_actions = input["push_actions"] context.prev_group = input["prev_group"] context.delta_ids = _decode_state_dict(input["delta_ids"]) context.prev_state_events = input["prev_state_events"] diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index fe09d50d55..8f619a7a1b 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -40,10 +40,6 @@ class ActionGenerator(object): @defer.inlineCallbacks def handle_push_actions_for_event(self, event, context): with Measure(self.clock, "action_for_event_by_user"): - actions_by_user = yield self.bulk_evaluator.action_for_event_by_user( + yield self.bulk_evaluator.action_for_event_by_user( event, context ) - - context.push_actions = [ - (uid, actions) for uid, actions in actions_by_user.iteritems() - ] diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 841ccbd1f1..1140788aa7 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -137,14 +137,13 @@ class BulkPushRuleEvaluator(object): @defer.inlineCallbacks def action_for_event_by_user(self, event, context): - """Given an event and context, evaluate the push rules and return - the results + """Given an event and context, evaluate the push rules and insert the + results into the event_push_actions_staging table. Returns: - dict of user_id -> action + Deferred """ rules_by_user = yield self._get_rules_for_event(event, context) - actions_by_user = {} room_members = yield self.store.get_joined_users_from_context( event, context @@ -190,12 +189,10 @@ class BulkPushRuleEvaluator(object): if matches: actions = [x for x in rule['actions'] if x != 'dont_notify'] if actions and 'notify' in actions: - actions_by_user[uid] = actions yield self.store.add_push_actions_to_staging( event.event_id, uid, actions, ) break - defer.returnValue(actions_by_user) def _condition_checker(evaluator, conditions, uid, display_name, cache): diff --git a/synapse/storage/events.py b/synapse/storage/events.py index ca64aacb1c..52b7b34749 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1168,10 +1168,9 @@ class EventsStore(SQLBaseStore): for event, context in events_and_contexts: # Insert all the push actions into the event_push_actions table. - if context.push_actions: - self._set_push_actions_for_event_and_users_txn( - txn, event, - ) + self._set_push_actions_for_event_and_users_txn( + txn, event, + ) if event.type == EventTypes.Redaction and event.redacts is not None: # Remove the entries in the event_push_actions table for the diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index f430cce931..4780f2ab72 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -230,7 +230,10 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): state_handler = self.hs.get_state_handler() context = yield state_handler.compute_event_context(event) - context.push_actions = push_actions + for user_id, actions in push_actions: + yield self.master_store.add_push_actions_to_staging( + event.event_id, user_id, actions, + ) ordering = None if backfill: -- cgit 1.4.1 From 012e8e142a4ca7d87e1ffd66cce44b23bf943e9c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 16 Feb 2018 11:35:01 +0000 Subject: Comments --- synapse/push/bulk_push_rule_evaluator.py | 3 +++ synapse/storage/event_push_actions.py | 3 ++- synapse/storage/schema/delta/47/push_actions_staging.sql | 4 ++++ 3 files changed, 9 insertions(+), 1 deletion(-) (limited to 'synapse/push') diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 1140788aa7..bf4f1c5836 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -189,6 +189,9 @@ class BulkPushRuleEvaluator(object): if matches: actions = [x for x in rule['actions'] if x != 'dont_notify'] if actions and 'notify' in actions: + # Push rules say we should notify the user of this event, + # so we mark it in the DB in the staging area. (This + # will then get handled when we persist the event) yield self.store.add_push_actions_to_staging( event.event_id, uid, actions, ) diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 28226455bf..ea56d4d065 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -762,7 +762,8 @@ class EventPushActionsStore(SQLBaseStore): Args: event_id (str) user_id (str) - actions (list) + actions (list[dict|str]): An action can either be a string or + dict. Returns: Deferred diff --git a/synapse/storage/schema/delta/47/push_actions_staging.sql b/synapse/storage/schema/delta/47/push_actions_staging.sql index ec4b1d7d42..edccf4a96f 100644 --- a/synapse/storage/schema/delta/47/push_actions_staging.sql +++ b/synapse/storage/schema/delta/47/push_actions_staging.sql @@ -13,6 +13,10 @@ * limitations under the License. */ +-- Temporary staging area for push actions that have been calculated for an +-- event, but the event hasn't yet been persisted. +-- When the event is persisted the rows are moved over to the +-- event_push_actions table. CREATE TABLE event_push_actions_staging ( event_id TEXT NOT NULL, user_id TEXT NOT NULL, -- cgit 1.4.1 From 6ff8c87484d13c00fddc87b0bcc3f4cd691c81ff Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 20 Feb 2018 11:30:54 +0000 Subject: Batch inserts into event_push_actions_staging --- synapse/push/bulk_push_rule_evaluator.py | 15 +++++---- synapse/storage/event_push_actions.py | 53 ++++++++++++++++++++++---------- 2 files changed, 45 insertions(+), 23 deletions(-) (limited to 'synapse/push') diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index bf4f1c5836..64e9a1da57 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -144,6 +144,7 @@ class BulkPushRuleEvaluator(object): Deferred """ rules_by_user = yield self._get_rules_for_event(event, context) + actions_by_user = {} room_members = yield self.store.get_joined_users_from_context( event, context @@ -189,14 +190,16 @@ class BulkPushRuleEvaluator(object): if matches: actions = [x for x in rule['actions'] if x != 'dont_notify'] if actions and 'notify' in actions: - # Push rules say we should notify the user of this event, - # so we mark it in the DB in the staging area. (This - # will then get handled when we persist the event) - yield self.store.add_push_actions_to_staging( - event.event_id, uid, actions, - ) + actions_by_user[uid] = actions break + # Push rules say we should notify the user of this event, + # so we mark it in the DB in the staging area. (This + # will then get handled when we persist the event) + yield self.store.add_push_actions_to_staging( + event.event_id, actions_by_user, + ) + def _condition_checker(evaluator, conditions, uid, display_name, cache): for cond in conditions: diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index f787431b7a..04e8836e6e 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -755,32 +755,51 @@ class EventPushActionsStore(SQLBaseStore): (rotate_to_stream_ordering,) ) - def add_push_actions_to_staging(self, event_id, user_id, actions): - """Add the push actions for the user and event to the push - action staging area. + def add_push_actions_to_staging(self, event_id, user_id_actions): + """Add the push actions for the event to the push action staging area. Args: event_id (str) - user_id (str) - actions (list[dict|str]): An action can either be a string or - dict. + user_id_actions (dict[str, list[dict|str])]): A dictionary mapping + user_id to list of push actions, where an action can either be + a string or dict. Returns: Deferred """ - is_highlight = 1 if _action_has_highlight(actions) else 0 + if not user_id_actions: + return - return self._simple_insert( - table="event_push_actions_staging", - values={ - "event_id": event_id, - "user_id": user_id, - "actions": _serialize_action(actions, is_highlight), - "notif": 1, - "highlight": is_highlight, - }, - desc="add_push_actions_to_staging", + # This is a helper function for generating the necessary tuple that + # can be used to inert into the `event_push_actions_staging` table. + def _gen_entry(user_id, actions): + is_highlight = 1 if _action_has_highlight(actions) else 0 + return ( + event_id, # event_id column + user_id, # user_id column + _serialize_action(actions, is_highlight), # actions column + 1, # notif column + is_highlight, # highlight column + ) + + def _add_push_actions_to_staging_txn(txn): + # We don't use _simple_insert_many here to avoid the overhead + # of generating lists of dicts. + + sql = """ + INSERT INTO event_push_actions_staging + (event_id, user_id, actions, notif, highlight) + VALUES (?, ?, ?, ?, ?) + """ + + txn.executemany(sql, ( + _gen_entry(user_id, actions) + for user_id, actions in user_id_actions.iteritems() + )) + + return self.runInteraction( + "add_push_actions_to_staging", _add_push_actions_to_staging_txn ) def remove_push_actions_from_staging(self, event_id): -- cgit 1.4.1 From 573712da6b720cb808e61e4cbd5426e85e58a161 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 21 Feb 2018 11:29:49 +0000 Subject: Update comments --- synapse/push/bulk_push_rule_evaluator.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'synapse/push') diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 64e9a1da57..7c680659b6 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -190,12 +190,13 @@ class BulkPushRuleEvaluator(object): if matches: actions = [x for x in rule['actions'] if x != 'dont_notify'] if actions and 'notify' in actions: + # Push rules say we should notify the user of this event actions_by_user[uid] = actions break - # Push rules say we should notify the user of this event, - # so we mark it in the DB in the staging area. (This - # will then get handled when we persist the event) + # Mark in the DB staging area the push actions for users who should be + # notified for this event. (This will then get handled when we persist + # the event) yield self.store.add_push_actions_to_staging( event.event_id, actions_by_user, ) -- cgit 1.4.1