diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py
index fe09d50d55..8f619a7a1b 100644
--- a/synapse/push/action_generator.py
+++ b/synapse/push/action_generator.py
@@ -40,10 +40,6 @@ class ActionGenerator(object):
@defer.inlineCallbacks
def handle_push_actions_for_event(self, event, context):
with Measure(self.clock, "action_for_event_by_user"):
- actions_by_user = yield self.bulk_evaluator.action_for_event_by_user(
+ yield self.bulk_evaluator.action_for_event_by_user(
event, context
)
-
- context.push_actions = [
- (uid, actions) for uid, actions in actions_by_user.iteritems()
- ]
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 425a017bdf..7c680659b6 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -137,11 +137,11 @@ class BulkPushRuleEvaluator(object):
@defer.inlineCallbacks
def action_for_event_by_user(self, event, context):
- """Given an event and context, evaluate the push rules and return
- the results
+ """Given an event and context, evaluate the push rules and insert the
+ results into the event_push_actions_staging table.
Returns:
- dict of user_id -> action
+ Deferred
"""
rules_by_user = yield self._get_rules_for_event(event, context)
actions_by_user = {}
@@ -190,9 +190,16 @@ class BulkPushRuleEvaluator(object):
if matches:
actions = [x for x in rule['actions'] if x != 'dont_notify']
if actions and 'notify' in actions:
+ # Push rules say we should notify the user of this event
actions_by_user[uid] = actions
break
- defer.returnValue(actions_by_user)
+
+ # Mark in the DB staging area the push actions for users who should be
+ # notified for this event. (This will then get handled when we persist
+ # the event)
+ yield self.store.add_push_actions_to_staging(
+ event.event_id, actions_by_user,
+ )
def _condition_checker(evaluator, conditions, uid, display_name, cache):
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index c16f61452c..2cbac571b8 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -13,21 +13,30 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-from synapse.push import PusherConfigException
+import logging
from twisted.internet import defer, reactor
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
-import logging
import push_rule_evaluator
import push_tools
-
+import synapse
+from synapse.push import PusherConfigException
from synapse.util.logcontext import LoggingContext
from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
+metrics = synapse.metrics.get_metrics_for(__name__)
+
+http_push_processed_counter = metrics.register_counter(
+ "http_pushes_processed",
+)
+
+http_push_failed_counter = metrics.register_counter(
+ "http_pushes_failed",
+)
+
class HttpPusher(object):
INITIAL_BACKOFF_SEC = 1 # in seconds because that's what Twisted takes
@@ -152,9 +161,16 @@ class HttpPusher(object):
self.user_id, self.last_stream_ordering, self.max_stream_ordering
)
+ logger.info(
+ "Processing %i unprocessed push actions for %s starting at "
+ "stream_ordering %s",
+ len(unprocessed), self.name, self.last_stream_ordering,
+ )
+
for push_action in unprocessed:
processed = yield self._process_one(push_action)
if processed:
+ http_push_processed_counter.inc()
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
self.last_stream_ordering = push_action['stream_ordering']
yield self.store.update_pusher_last_stream_ordering_and_success(
@@ -169,6 +185,7 @@ class HttpPusher(object):
self.failing_since
)
else:
+ http_push_failed_counter.inc()
if not self.failing_since:
self.failing_since = self.clock.time_msec()
yield self.store.update_pusher_failing_since(
@@ -316,7 +333,10 @@ class HttpPusher(object):
try:
resp = yield self.http_client.post_json_get_json(self.url, notification_dict)
except Exception:
- logger.warn("Failed to push %s ", self.url)
+ logger.warn(
+ "Failed to push event %s to %s",
+ event.event_id, self.name, exc_info=True,
+ )
defer.returnValue(False)
rejected = []
if 'rejected' in resp:
@@ -325,7 +345,7 @@ class HttpPusher(object):
@defer.inlineCallbacks
def _send_badge(self, badge):
- logger.info("Sending updated badge count %d to %r", badge, self.user_id)
+ logger.info("Sending updated badge count %d to %s", badge, self.name)
d = {
'notification': {
'id': '',
@@ -347,7 +367,10 @@ class HttpPusher(object):
try:
resp = yield self.http_client.post_json_get_json(self.url, d)
except Exception:
- logger.exception("Failed to push %s ", self.url)
+ logger.warn(
+ "Failed to send badge count to %s",
+ self.name, exc_info=True,
+ )
defer.returnValue(False)
rejected = []
if 'rejected' in resp:
|