diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 9a96e6fe8f..b0d64aa6c4 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -19,8 +19,9 @@ from twisted.internet import defer
from .push_rule_evaluator import PushRuleEvaluatorForEvent
-from synapse.visibility import filter_events_for_clients_context
from synapse.api.constants import EventTypes, Membership
+from synapse.metrics import get_metrics_for
+from synapse.util.caches import metrics as cache_metrics
from synapse.util.caches.descriptors import cached
from synapse.util.async import Linearizer
@@ -32,6 +33,23 @@ logger = logging.getLogger(__name__)
rules_by_room = {}
+push_metrics = get_metrics_for(__name__)
+
+push_rules_invalidation_counter = push_metrics.register_counter(
+ "push_rules_invalidation_counter"
+)
+push_rules_state_size_counter = push_metrics.register_counter(
+ "push_rules_state_size_counter"
+)
+
+# Measures whether we use the fast path of using state deltas, or if we have to
+# recalculate from scratch
+push_rules_delta_state_cache_metric = cache_metrics.register_cache(
+ "cache",
+ size_callback=lambda: 0, # Meaningless size, as this isn't a cache that stores values
+ cache_name="push_rules_delta_state_cache_metric",
+)
+
class BulkPushRuleEvaluator(object):
"""Calculates the outcome of push rules for an event for all users in the
@@ -42,6 +60,12 @@ class BulkPushRuleEvaluator(object):
self.hs = hs
self.store = hs.get_datastore()
+ self.room_push_rule_cache_metrics = cache_metrics.register_cache(
+ "cache",
+ size_callback=lambda: 0, # There's not good value for this
+ cache_name="room_push_rule_cache",
+ )
+
@defer.inlineCallbacks
def _get_rules_for_event(self, event, context):
"""This gets the rules for all users in the room at the time of the event,
@@ -79,7 +103,10 @@ class BulkPushRuleEvaluator(object):
# It's important that RulesForRoom gets added to self._get_rules_for_room.cache
# before any lookup methods get called on it as otherwise there may be
# a race if invalidate_all gets called (which assumes its in the cache)
- return RulesForRoom(self.hs, room_id, self._get_rules_for_room.cache)
+ return RulesForRoom(
+ self.hs, room_id, self._get_rules_for_room.cache,
+ self.room_push_rule_cache_metrics,
+ )
@defer.inlineCallbacks
def action_for_event_by_user(self, event, context):
@@ -92,15 +119,6 @@ class BulkPushRuleEvaluator(object):
rules_by_user = yield self._get_rules_for_event(event, context)
actions_by_user = {}
- # None of these users can be peeking since this list of users comes
- # from the set of users in the room, so we know for sure they're all
- # actually in the room.
- user_tuples = [(u, False) for u in rules_by_user]
-
- filtered_by_user = yield filter_events_for_clients_context(
- self.store, user_tuples, [event], {event.event_id: context}
- )
-
room_members = yield self.store.get_joined_users_from_context(
event, context
)
@@ -110,6 +128,14 @@ class BulkPushRuleEvaluator(object):
condition_cache = {}
for uid, rules in rules_by_user.iteritems():
+ if event.sender == uid:
+ continue
+
+ if not event.is_state():
+ is_ignored = yield self.store.is_ignored_by(event.sender, uid)
+ if is_ignored:
+ continue
+
display_name = None
profile_info = room_members.get(uid)
if profile_info:
@@ -121,13 +147,6 @@ class BulkPushRuleEvaluator(object):
if event.type == EventTypes.Member and event.state_key == uid:
display_name = event.content.get("displayname", None)
- filtered = filtered_by_user[uid]
- if len(filtered) == 0:
- continue
-
- if filtered[0].sender == uid:
- continue
-
for rule in rules:
if 'enabled' in rule and not rule['enabled']:
continue
@@ -170,17 +189,19 @@ class RulesForRoom(object):
the entire cache for the room.
"""
- def __init__(self, hs, room_id, rules_for_room_cache):
+ def __init__(self, hs, room_id, rules_for_room_cache, room_push_rule_cache_metrics):
"""
Args:
hs (HomeServer)
room_id (str)
rules_for_room_cache(Cache): The cache object that caches these
RoomsForUser objects.
+ room_push_rule_cache_metrics (CacheMetric)
"""
self.room_id = room_id
self.is_mine_id = hs.is_mine_id
self.store = hs.get_datastore()
+ self.room_push_rule_cache_metrics = room_push_rule_cache_metrics
self.linearizer = Linearizer(name="rules_for_room")
@@ -222,11 +243,19 @@ class RulesForRoom(object):
"""
state_group = context.state_group
+ if state_group and self.state_group == state_group:
+ logger.debug("Using cached rules for %r", self.room_id)
+ self.room_push_rule_cache_metrics.inc_hits()
+ defer.returnValue(self.rules_by_user)
+
with (yield self.linearizer.queue(())):
if state_group and self.state_group == state_group:
logger.debug("Using cached rules for %r", self.room_id)
+ self.room_push_rule_cache_metrics.inc_hits()
defer.returnValue(self.rules_by_user)
+ self.room_push_rule_cache_metrics.inc_misses()
+
ret_rules_by_user = {}
missing_member_event_ids = {}
if state_group and self.state_group == context.prev_group:
@@ -234,8 +263,13 @@ class RulesForRoom(object):
# results.
ret_rules_by_user = self.rules_by_user
current_state_ids = context.delta_ids
+
+ push_rules_delta_state_cache_metric.inc_hits()
else:
current_state_ids = context.current_state_ids
+ push_rules_delta_state_cache_metric.inc_misses()
+
+ push_rules_state_size_counter.inc_by(len(current_state_ids))
logger.debug(
"Looking for member changes in %r %r", state_group, current_state_ids
@@ -282,6 +316,14 @@ class RulesForRoom(object):
yield self._update_rules_with_member_event_ids(
ret_rules_by_user, missing_member_event_ids, state_group, event
)
+ else:
+ # The push rules didn't change but lets update the cache anyway
+ self.update_cache(
+ self.sequence,
+ members={}, # There were no membership changes
+ rules_by_user=ret_rules_by_user,
+ state_group=state_group
+ )
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
@@ -380,6 +422,7 @@ class RulesForRoom(object):
self.state_group = object()
self.member_map = {}
self.rules_by_user = {}
+ push_rules_invalidation_counter.inc()
def update_cache(self, sequence, members, rules_by_user, state_group):
if sequence == self.sequence:
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index c0f8176e3d..8a5d473108 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -275,7 +275,7 @@ class HttpPusher(object):
if event.type == 'm.room.member':
d['notification']['membership'] = event.content['membership']
d['notification']['user_is_target'] = event.state_key == self.user_id
- if 'content' in event:
+ if not self.hs.config.push_redact_content and 'content' in event:
d['notification']['content'] = event.content
# We no longer send aliases separately, instead, we send the human
diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py
index 4d88046579..172c27c137 100644
--- a/synapse/push/push_rule_evaluator.py
+++ b/synapse/push/push_rule_evaluator.py
@@ -200,7 +200,9 @@ def _glob_to_re(glob, word_boundary):
return re.compile(r, flags=re.IGNORECASE)
-def _flatten_dict(d, prefix=[], result={}):
+def _flatten_dict(d, prefix=[], result=None):
+ if result is None:
+ result = {}
for key, value in d.items():
if isinstance(value, basestring):
result[".".join(prefix + [key])] = value.lower()
|