diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py
index fe09d50d55..a5de75c48a 100644
--- a/synapse/push/action_generator.py
+++ b/synapse/push/action_generator.py
@@ -13,13 +13,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from twisted.internet import defer
+import logging
-from .bulk_push_rule_evaluator import BulkPushRuleEvaluator
+from twisted.internet import defer
from synapse.util.metrics import Measure
-import logging
+from .bulk_push_rule_evaluator import BulkPushRuleEvaluator
logger = logging.getLogger(__name__)
@@ -40,10 +40,6 @@ class ActionGenerator(object):
@defer.inlineCallbacks
def handle_push_actions_for_event(self, event, context):
with Measure(self.clock, "action_for_event_by_user"):
- actions_by_user = yield self.bulk_evaluator.action_for_event_by_user(
+ yield self.bulk_evaluator.action_for_event_by_user(
event, context
)
-
- context.push_actions = [
- (uid, actions) for uid, actions in actions_by_user.iteritems()
- ]
diff --git a/synapse/push/baserules.py b/synapse/push/baserules.py
index 85effdfa46..8f0682c948 100644
--- a/synapse/push/baserules.py
+++ b/synapse/push/baserules.py
@@ -1,4 +1,5 @@
# Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2017 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -12,9 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.push.rulekinds import PRIORITY_CLASS_MAP, PRIORITY_CLASS_INVERSE_MAP
import copy
+from synapse.push.rulekinds import PRIORITY_CLASS_INVERSE_MAP, PRIORITY_CLASS_MAP
+
def list_with_base_rules(rawrules):
"""Combine the list of rules set by the user with the default push rules
@@ -38,7 +40,7 @@ def list_with_base_rules(rawrules):
rawrules = [r for r in rawrules if r['priority_class'] >= 0]
# shove the server default rules for each kind onto the end of each
- current_prio_class = PRIORITY_CLASS_INVERSE_MAP.keys()[-1]
+ current_prio_class = list(PRIORITY_CLASS_INVERSE_MAP)[-1]
ruleslist.extend(make_base_prepend_rules(
PRIORITY_CLASS_INVERSE_MAP[current_prio_class], modified_base_rules
@@ -238,6 +240,28 @@ BASE_APPEND_OVERRIDE_RULES = [
}
]
},
+ {
+ 'rule_id': 'global/override/.m.rule.roomnotif',
+ 'conditions': [
+ {
+ 'kind': 'event_match',
+ 'key': 'content.body',
+ 'pattern': '@room',
+ '_id': '_roomnotif_content',
+ },
+ {
+ 'kind': 'sender_notification_permission',
+ 'key': 'room',
+ '_id': '_roomnotif_pl',
+ },
+ ],
+ 'actions': [
+ 'notify', {
+ 'set_tweak': 'highlight',
+ 'value': True,
+ }
+ ]
+ }
]
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 9a96e6fe8f..1d14d3639c 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2015 OpenMarket Ltd
+# Copyright 2017 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,18 +15,22 @@
# limitations under the License.
import logging
+from collections import namedtuple
-from twisted.internet import defer
+from six import iteritems, itervalues
-from .push_rule_evaluator import PushRuleEvaluatorForEvent
+from prometheus_client import Counter
+
+from twisted.internet import defer
-from synapse.visibility import filter_events_for_clients_context
from synapse.api.constants import EventTypes, Membership
-from synapse.util.caches.descriptors import cached
+from synapse.event_auth import get_user_power_level
+from synapse.state import POWER_KEY
from synapse.util.async import Linearizer
+from synapse.util.caches import register_cache
+from synapse.util.caches.descriptors import cached
-from collections import namedtuple
-
+from .push_rule_evaluator import PushRuleEvaluatorForEvent
logger = logging.getLogger(__name__)
@@ -33,6 +38,20 @@ logger = logging.getLogger(__name__)
rules_by_room = {}
+push_rules_invalidation_counter = Counter(
+ "synapse_push_bulk_push_rule_evaluator_push_rules_invalidation_counter", "")
+push_rules_state_size_counter = Counter(
+ "synapse_push_bulk_push_rule_evaluator_push_rules_state_size_counter", "")
+
+# Measures whether we use the fast path of using state deltas, or if we have to
+# recalculate from scratch
+push_rules_delta_state_cache_metric = register_cache(
+ "cache",
+ "push_rules_delta_state_cache_metric",
+ cache=[], # Meaningless size, as this isn't a cache that stores values
+)
+
+
class BulkPushRuleEvaluator(object):
"""Calculates the outcome of push rules for an event for all users in the
room at once.
@@ -41,6 +60,13 @@ class BulkPushRuleEvaluator(object):
def __init__(self, hs):
self.hs = hs
self.store = hs.get_datastore()
+ self.auth = hs.get_auth()
+
+ self.room_push_rule_cache_metrics = register_cache(
+ "cache",
+ "room_push_rule_cache",
+ cache=[], # Meaningless size, as this isn't a cache that stores values
+ )
@defer.inlineCallbacks
def _get_rules_for_event(self, event, context):
@@ -79,37 +105,69 @@ class BulkPushRuleEvaluator(object):
# It's important that RulesForRoom gets added to self._get_rules_for_room.cache
# before any lookup methods get called on it as otherwise there may be
# a race if invalidate_all gets called (which assumes its in the cache)
- return RulesForRoom(self.hs, room_id, self._get_rules_for_room.cache)
+ return RulesForRoom(
+ self.hs, room_id, self._get_rules_for_room.cache,
+ self.room_push_rule_cache_metrics,
+ )
+
+ @defer.inlineCallbacks
+ def _get_power_levels_and_sender_level(self, event, context):
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+ pl_event_id = prev_state_ids.get(POWER_KEY)
+ if pl_event_id:
+ # fastpath: if there's a power level event, that's all we need, and
+ # not having a power level event is an extreme edge case
+ pl_event = yield self.store.get_event(pl_event_id)
+ auth_events = {POWER_KEY: pl_event}
+ else:
+ auth_events_ids = yield self.auth.compute_auth_events(
+ event, prev_state_ids, for_verification=False,
+ )
+ auth_events = yield self.store.get_events(auth_events_ids)
+ auth_events = {
+ (e.type, e.state_key): e for e in itervalues(auth_events)
+ }
+
+ sender_level = get_user_power_level(event.sender, auth_events)
+
+ pl_event = auth_events.get(POWER_KEY)
+
+ defer.returnValue((pl_event.content if pl_event else {}, sender_level))
@defer.inlineCallbacks
def action_for_event_by_user(self, event, context):
- """Given an event and context, evaluate the push rules and return
- the results
+ """Given an event and context, evaluate the push rules and insert the
+ results into the event_push_actions_staging table.
Returns:
- dict of user_id -> action
+ Deferred
"""
rules_by_user = yield self._get_rules_for_event(event, context)
actions_by_user = {}
- # None of these users can be peeking since this list of users comes
- # from the set of users in the room, so we know for sure they're all
- # actually in the room.
- user_tuples = [(u, False) for u in rules_by_user]
-
- filtered_by_user = yield filter_events_for_clients_context(
- self.store, user_tuples, [event], {event.event_id: context}
- )
-
room_members = yield self.store.get_joined_users_from_context(
event, context
)
- evaluator = PushRuleEvaluatorForEvent(event, len(room_members))
+ (power_levels, sender_power_level) = (
+ yield self._get_power_levels_and_sender_level(event, context)
+ )
+
+ evaluator = PushRuleEvaluatorForEvent(
+ event, len(room_members), sender_power_level, power_levels,
+ )
condition_cache = {}
- for uid, rules in rules_by_user.iteritems():
+ for uid, rules in iteritems(rules_by_user):
+ if event.sender == uid:
+ continue
+
+ if not event.is_state():
+ is_ignored = yield self.store.is_ignored_by(event.sender, uid)
+ if is_ignored:
+ continue
+
display_name = None
profile_info = room_members.get(uid)
if profile_info:
@@ -121,13 +179,6 @@ class BulkPushRuleEvaluator(object):
if event.type == EventTypes.Member and event.state_key == uid:
display_name = event.content.get("displayname", None)
- filtered = filtered_by_user[uid]
- if len(filtered) == 0:
- continue
-
- if filtered[0].sender == uid:
- continue
-
for rule in rules:
if 'enabled' in rule and not rule['enabled']:
continue
@@ -138,9 +189,16 @@ class BulkPushRuleEvaluator(object):
if matches:
actions = [x for x in rule['actions'] if x != 'dont_notify']
if actions and 'notify' in actions:
+ # Push rules say we should notify the user of this event
actions_by_user[uid] = actions
break
- defer.returnValue(actions_by_user)
+
+ # Mark in the DB staging area the push actions for users who should be
+ # notified for this event. (This will then get handled when we persist
+ # the event)
+ yield self.store.add_push_actions_to_staging(
+ event.event_id, actions_by_user,
+ )
def _condition_checker(evaluator, conditions, uid, display_name, cache):
@@ -170,17 +228,19 @@ class RulesForRoom(object):
the entire cache for the room.
"""
- def __init__(self, hs, room_id, rules_for_room_cache):
+ def __init__(self, hs, room_id, rules_for_room_cache, room_push_rule_cache_metrics):
"""
Args:
hs (HomeServer)
room_id (str)
rules_for_room_cache(Cache): The cache object that caches these
RoomsForUser objects.
+ room_push_rule_cache_metrics (CacheMetric)
"""
self.room_id = room_id
self.is_mine_id = hs.is_mine_id
self.store = hs.get_datastore()
+ self.room_push_rule_cache_metrics = room_push_rule_cache_metrics
self.linearizer = Linearizer(name="rules_for_room")
@@ -222,11 +282,19 @@ class RulesForRoom(object):
"""
state_group = context.state_group
+ if state_group and self.state_group == state_group:
+ logger.debug("Using cached rules for %r", self.room_id)
+ self.room_push_rule_cache_metrics.inc_hits()
+ defer.returnValue(self.rules_by_user)
+
with (yield self.linearizer.queue(())):
if state_group and self.state_group == state_group:
logger.debug("Using cached rules for %r", self.room_id)
+ self.room_push_rule_cache_metrics.inc_hits()
defer.returnValue(self.rules_by_user)
+ self.room_push_rule_cache_metrics.inc_misses()
+
ret_rules_by_user = {}
missing_member_event_ids = {}
if state_group and self.state_group == context.prev_group:
@@ -234,8 +302,13 @@ class RulesForRoom(object):
# results.
ret_rules_by_user = self.rules_by_user
current_state_ids = context.delta_ids
+
+ push_rules_delta_state_cache_metric.inc_hits()
else:
- current_state_ids = context.current_state_ids
+ current_state_ids = yield context.get_current_state_ids(self.store)
+ push_rules_delta_state_cache_metric.inc_misses()
+
+ push_rules_state_size_counter.inc(len(current_state_ids))
logger.debug(
"Looking for member changes in %r %r", state_group, current_state_ids
@@ -282,6 +355,14 @@ class RulesForRoom(object):
yield self._update_rules_with_member_event_ids(
ret_rules_by_user, missing_member_event_ids, state_group, event
)
+ else:
+ # The push rules didn't change but lets update the cache anyway
+ self.update_cache(
+ self.sequence,
+ members={}, # There were no membership changes
+ rules_by_user=ret_rules_by_user,
+ state_group=state_group
+ )
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
@@ -324,7 +405,7 @@ class RulesForRoom(object):
# If the event is a join event then it will be in current state evnts
# map but not in the DB, so we have to explicitly insert it.
if event.type == EventTypes.Member:
- for event_id in member_event_ids.itervalues():
+ for event_id in itervalues(member_event_ids):
if event_id == event.event_id:
members[event_id] = (event.state_key, event.membership)
@@ -332,7 +413,7 @@ class RulesForRoom(object):
logger.debug("Found members %r: %r", self.room_id, members.values())
interested_in_user_ids = set(
- user_id for user_id, membership in members.itervalues()
+ user_id for user_id, membership in itervalues(members)
if membership == Membership.JOIN
)
@@ -344,7 +425,7 @@ class RulesForRoom(object):
)
user_ids = set(
- uid for uid, have_pusher in if_users_with_pushers.iteritems() if have_pusher
+ uid for uid, have_pusher in iteritems(if_users_with_pushers) if have_pusher
)
logger.debug("With pushers: %r", user_ids)
@@ -365,7 +446,7 @@ class RulesForRoom(object):
)
ret_rules_by_user.update(
- item for item in rules_by_user.iteritems() if item[0] is not None
+ item for item in iteritems(rules_by_user) if item[0] is not None
)
self.update_cache(sequence, members, ret_rules_by_user, state_group)
@@ -380,6 +461,7 @@ class RulesForRoom(object):
self.state_group = object()
self.member_map = {}
self.rules_by_user = {}
+ push_rules_invalidation_counter.inc()
def update_cache(self, sequence, members, rules_by_user, state_group):
if sequence == self.sequence:
diff --git a/synapse/push/clientformat.py b/synapse/push/clientformat.py
index e0331b2d2d..ecbf364a5e 100644
--- a/synapse/push/clientformat.py
+++ b/synapse/push/clientformat.py
@@ -13,12 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.push.rulekinds import (
- PRIORITY_CLASS_MAP, PRIORITY_CLASS_INVERSE_MAP
-)
-
import copy
+from synapse.push.rulekinds import PRIORITY_CLASS_INVERSE_MAP, PRIORITY_CLASS_MAP
+
def format_push_rules_for_user(user, ruleslist):
"""Converts a list of rawrules and a enabled map into nested dictionaries
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index a69dda7b09..d746371420 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -13,14 +13,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from twisted.internet import defer, reactor
-from twisted.internet.error import AlreadyCalled, AlreadyCancelled
-
import logging
-from synapse.util.metrics import Measure
-from synapse.util.logcontext import LoggingContext
+from twisted.internet import defer
+from twisted.internet.error import AlreadyCalled, AlreadyCancelled
+from synapse.util.logcontext import LoggingContext
+from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
@@ -77,10 +76,13 @@ class EmailPusher(object):
@defer.inlineCallbacks
def on_started(self):
if self.mailer is not None:
- self.throttle_params = yield self.store.get_throttle_params_by_room(
- self.pusher_id
- )
- yield self._process()
+ try:
+ self.throttle_params = yield self.store.get_throttle_params_by_room(
+ self.pusher_id
+ )
+ yield self._process()
+ except Exception:
+ logger.exception("Error starting email pusher")
def on_stop(self):
if self.timed_call:
@@ -121,7 +123,7 @@ class EmailPusher(object):
starting_max_ordering = self.max_stream_ordering
try:
yield self._unsafe_process()
- except:
+ except Exception:
logger.exception("Exception processing notifs")
if self.max_stream_ordering == starting_max_ordering:
break
@@ -196,7 +198,7 @@ class EmailPusher(object):
self.timed_call = None
if soonest_due_at is not None:
- self.timed_call = reactor.callLater(
+ self.timed_call = self.hs.get_reactor().callLater(
self.seconds_until(soonest_due_at), self.on_timer
)
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 8a5d473108..81e18bcf7d 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2017 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -12,21 +13,25 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
-from synapse.push import PusherConfigException
+from prometheus_client import Counter
-from twisted.internet import defer, reactor
+from twisted.internet import defer
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
-import logging
-import push_rule_evaluator
-import push_tools
-
+from synapse.push import PusherConfigException
from synapse.util.logcontext import LoggingContext
from synapse.util.metrics import Measure
+from . import push_rule_evaluator, push_tools
+
logger = logging.getLogger(__name__)
+http_push_processed_counter = Counter("synapse_http_httppusher_http_pushes_processed", "")
+
+http_push_failed_counter = Counter("synapse_http_httppusher_http_pushes_failed", "")
+
class HttpPusher(object):
INITIAL_BACKOFF_SEC = 1 # in seconds because that's what Twisted takes
@@ -84,7 +89,10 @@ class HttpPusher(object):
@defer.inlineCallbacks
def on_started(self):
- yield self._process()
+ try:
+ yield self._process()
+ except Exception:
+ logger.exception("Error starting http pusher")
@defer.inlineCallbacks
def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
@@ -131,7 +139,7 @@ class HttpPusher(object):
starting_max_ordering = self.max_stream_ordering
try:
yield self._unsafe_process()
- except:
+ except Exception:
logger.exception("Exception processing notifs")
if self.max_stream_ordering == starting_max_ordering:
break
@@ -151,9 +159,16 @@ class HttpPusher(object):
self.user_id, self.last_stream_ordering, self.max_stream_ordering
)
+ logger.info(
+ "Processing %i unprocessed push actions for %s starting at "
+ "stream_ordering %s",
+ len(unprocessed), self.name, self.last_stream_ordering,
+ )
+
for push_action in unprocessed:
processed = yield self._process_one(push_action)
if processed:
+ http_push_processed_counter.inc()
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
self.last_stream_ordering = push_action['stream_ordering']
yield self.store.update_pusher_last_stream_ordering_and_success(
@@ -168,6 +183,7 @@ class HttpPusher(object):
self.failing_since
)
else:
+ http_push_failed_counter.inc()
if not self.failing_since:
self.failing_since = self.clock.time_msec()
yield self.store.update_pusher_failing_since(
@@ -204,7 +220,9 @@ class HttpPusher(object):
)
else:
logger.info("Push failed: delaying for %ds", self.backoff_delay)
- self.timed_call = reactor.callLater(self.backoff_delay, self.on_timer)
+ self.timed_call = self.hs.get_reactor().callLater(
+ self.backoff_delay, self.on_timer
+ )
self.backoff_delay = min(self.backoff_delay * 2, self.MAX_BACKOFF_SEC)
break
@@ -244,6 +262,26 @@ class HttpPusher(object):
@defer.inlineCallbacks
def _build_notification_dict(self, event, tweaks, badge):
+ if self.data.get('format') == 'event_id_only':
+ d = {
+ 'notification': {
+ 'event_id': event.event_id,
+ 'room_id': event.room_id,
+ 'counts': {
+ 'unread': badge,
+ },
+ 'devices': [
+ {
+ 'app_id': self.app_id,
+ 'pushkey': self.pushkey,
+ 'pushkey_ts': long(self.pushkey_ts / 1000),
+ 'data': self.data_minus_url,
+ }
+ ]
+ }
+ }
+ defer.returnValue(d)
+
ctx = yield push_tools.get_context_for_event(
self.store, self.state_handler, event, self.user_id
)
@@ -275,7 +313,7 @@ class HttpPusher(object):
if event.type == 'm.room.member':
d['notification']['membership'] = event.content['membership']
d['notification']['user_is_target'] = event.state_key == self.user_id
- if not self.hs.config.push_redact_content and 'content' in event:
+ if self.hs.config.push_include_content and 'content' in event:
d['notification']['content'] = event.content
# We no longer send aliases separately, instead, we send the human
@@ -294,8 +332,11 @@ class HttpPusher(object):
defer.returnValue([])
try:
resp = yield self.http_client.post_json_get_json(self.url, notification_dict)
- except:
- logger.warn("Failed to push %s ", self.url)
+ except Exception:
+ logger.warn(
+ "Failed to push event %s to %s",
+ event.event_id, self.name, exc_info=True,
+ )
defer.returnValue(False)
rejected = []
if 'rejected' in resp:
@@ -304,7 +345,7 @@ class HttpPusher(object):
@defer.inlineCallbacks
def _send_badge(self, badge):
- logger.info("Sending updated badge count %d to %r", badge, self.user_id)
+ logger.info("Sending updated badge count %d to %s", badge, self.name)
d = {
'notification': {
'id': '',
@@ -325,8 +366,11 @@ class HttpPusher(object):
}
try:
resp = yield self.http_client.post_json_get_json(self.url, d)
- except:
- logger.exception("Failed to push %s ", self.url)
+ except Exception:
+ logger.warn(
+ "Failed to send badge count to %s",
+ self.name, exc_info=True,
+ )
defer.returnValue(False)
rejected = []
if 'rejected' in resp:
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index b5cd9b426a..9d601208fd 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -13,30 +13,31 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from twisted.internet import defer
-from twisted.mail.smtp import sendmail
-
-import email.utils
import email.mime.multipart
-from email.mime.text import MIMEText
+import email.utils
+import logging
+import time
+import urllib
from email.mime.multipart import MIMEMultipart
+from email.mime.text import MIMEText
-from synapse.util.async import concurrently_execute
+import bleach
+import jinja2
+
+from twisted.internet import defer
+from twisted.mail.smtp import sendmail
+
+from synapse.api.constants import EventTypes
+from synapse.api.errors import StoreError
from synapse.push.presentable_names import (
- calculate_room_name, name_from_member_event, descriptor_from_member_events
+ calculate_room_name,
+ descriptor_from_member_events,
+ name_from_member_event,
)
from synapse.types import UserID
-from synapse.api.errors import StoreError
-from synapse.api.constants import EventTypes
+from synapse.util.async import concurrently_execute
from synapse.visibility import filter_events_for_client
-import jinja2
-import bleach
-
-import time
-import urllib
-
-import logging
logger = logging.getLogger(__name__)
@@ -229,7 +230,8 @@ class Mailer(object):
if room_vars['notifs'] and 'messages' in room_vars['notifs'][-1]:
prev_messages = room_vars['notifs'][-1]['messages']
for message in notifvars['messages']:
- pm = filter(lambda pm: pm['id'] == message['id'], prev_messages)
+ pm = list(filter(lambda pm: pm['id'] == message['id'],
+ prev_messages))
if pm:
if not message["is_historical"]:
pm[0]["is_historical"] = False
diff --git a/synapse/push/presentable_names.py b/synapse/push/presentable_names.py
index 277da3cd35..eef6e18c2e 100644
--- a/synapse/push/presentable_names.py
+++ b/synapse/push/presentable_names.py
@@ -13,10 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from twisted.internet import defer
-
-import re
import logging
+import re
+
+from twisted.internet import defer
logger = logging.getLogger(__name__)
@@ -113,7 +113,7 @@ def calculate_room_name(store, room_state_ids, user_id, fallback_to_members=True
# so find out who is in the room that isn't the user.
if "m.room.member" in room_state_bytype_ids:
member_events = yield store.get_events(
- room_state_bytype_ids["m.room.member"].values()
+ list(room_state_bytype_ids["m.room.member"].values())
)
all_members = [
ev for ev in member_events.values()
diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py
index 172c27c137..2bd321d530 100644
--- a/synapse/push/push_rule_evaluator.py
+++ b/synapse/push/push_rule_evaluator.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2017 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -16,6 +17,8 @@
import logging
import re
+from six import string_types
+
from synapse.types import UserID
from synapse.util.caches import CACHE_SIZE_FACTOR, register_cache
from synapse.util.caches.lrucache import LruCache
@@ -29,6 +32,21 @@ INEQUALITY_EXPR = re.compile("^([=<>]*)([0-9]*)$")
def _room_member_count(ev, condition, room_member_count):
+ return _test_ineq_condition(condition, room_member_count)
+
+
+def _sender_notification_permission(ev, condition, sender_power_level, power_levels):
+ notif_level_key = condition.get('key')
+ if notif_level_key is None:
+ return False
+
+ notif_levels = power_levels.get('notifications', {})
+ room_notif_level = notif_levels.get(notif_level_key, 50)
+
+ return sender_power_level >= room_notif_level
+
+
+def _test_ineq_condition(condition, number):
if 'is' not in condition:
return False
m = INEQUALITY_EXPR.match(condition['is'])
@@ -41,15 +59,15 @@ def _room_member_count(ev, condition, room_member_count):
rhs = int(rhs)
if ineq == '' or ineq == '==':
- return room_member_count == rhs
+ return number == rhs
elif ineq == '<':
- return room_member_count < rhs
+ return number < rhs
elif ineq == '>':
- return room_member_count > rhs
+ return number > rhs
elif ineq == '>=':
- return room_member_count >= rhs
+ return number >= rhs
elif ineq == '<=':
- return room_member_count <= rhs
+ return number <= rhs
else:
return False
@@ -65,9 +83,11 @@ def tweaks_for_actions(actions):
class PushRuleEvaluatorForEvent(object):
- def __init__(self, event, room_member_count):
+ def __init__(self, event, room_member_count, sender_power_level, power_levels):
self._event = event
self._room_member_count = room_member_count
+ self._sender_power_level = sender_power_level
+ self._power_levels = power_levels
# Maps strings of e.g. 'content.body' -> event["content"]["body"]
self._value_cache = _flatten_dict(event)
@@ -81,6 +101,10 @@ class PushRuleEvaluatorForEvent(object):
return _room_member_count(
self._event, condition, self._room_member_count
)
+ elif condition['kind'] == 'sender_notification_permission':
+ return _sender_notification_permission(
+ self._event, condition, self._sender_power_level, self._power_levels,
+ )
else:
return True
@@ -128,7 +152,7 @@ class PushRuleEvaluatorForEvent(object):
# Caches (glob, word_boundary) -> regex for push. See _glob_matches
regex_cache = LruCache(50000 * CACHE_SIZE_FACTOR)
-register_cache("regex_push_cache", regex_cache)
+register_cache("cache", "regex_push_cache", regex_cache)
def _glob_matches(glob, value, word_boundary=False):
@@ -183,7 +207,7 @@ def _glob_to_re(glob, word_boundary):
r,
)
if word_boundary:
- r = r"\b%s\b" % (r,)
+ r = _re_word_boundary(r)
return re.compile(r, flags=re.IGNORECASE)
else:
@@ -192,7 +216,7 @@ def _glob_to_re(glob, word_boundary):
return re.compile(r, flags=re.IGNORECASE)
elif word_boundary:
r = re.escape(glob)
- r = r"\b%s\b" % (r,)
+ r = _re_word_boundary(r)
return re.compile(r, flags=re.IGNORECASE)
else:
@@ -200,11 +224,23 @@ def _glob_to_re(glob, word_boundary):
return re.compile(r, flags=re.IGNORECASE)
+def _re_word_boundary(r):
+ """
+ Adds word boundary characters to the start and end of an
+ expression to require that the match occur as a whole word,
+ but do so respecting the fact that strings starting or ending
+ with non-word characters will change word boundaries.
+ """
+ # we can't use \b as it chokes on unicode. however \W seems to be okay
+ # as shorthand for [^0-9A-Za-z_].
+ return r"(^|\W)%s(\W|$)" % (r,)
+
+
def _flatten_dict(d, prefix=[], result=None):
if result is None:
result = {}
for key, value in d.items():
- if isinstance(value, basestring):
+ if isinstance(value, string_types):
result[".".join(prefix + [key])] = value.lower()
elif hasattr(value, "items"):
_flatten_dict(value, prefix=(prefix + [key]), result=result)
diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py
index 6835f54e97..8049c298c2 100644
--- a/synapse/push/push_tools.py
+++ b/synapse/push/push_tools.py
@@ -14,9 +14,8 @@
# limitations under the License.
from twisted.internet import defer
-from synapse.push.presentable_names import (
- calculate_room_name, name_from_member_event
-)
+
+from synapse.push.presentable_names import calculate_room_name, name_from_member_event
@defer.inlineCallbacks
diff --git a/synapse/push/pusher.py b/synapse/push/pusher.py
index 491f27bded..fcee6d9d7e 100644
--- a/synapse/push/pusher.py
+++ b/synapse/push/pusher.py
@@ -13,9 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from httppusher import HttpPusher
-
import logging
+
+from .httppusher import HttpPusher
+
logger = logging.getLogger(__name__)
# We try importing this if we can (it will fail if we don't
@@ -27,7 +28,7 @@ logger = logging.getLogger(__name__)
try:
from synapse.push.emailpusher import EmailPusher
from synapse.push.mailer import Mailer, load_jinja2_templates
-except:
+except Exception:
pass
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 43cb6e9c01..36bb5bbc65 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -14,13 +14,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+
from twisted.internet import defer
-from .pusher import PusherFactory
-from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
-from synapse.util.async import run_on_reactor
-
-import logging
+from synapse.push.pusher import PusherFactory
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
logger = logging.getLogger(__name__)
@@ -103,23 +102,28 @@ class PusherPool:
yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
@defer.inlineCallbacks
- def remove_pushers_by_user(self, user_id, except_access_token_id=None):
- all = yield self.store.get_all_pushers()
- logger.info(
- "Removing all pushers for user %s except access tokens id %r",
- user_id, except_access_token_id
- )
- for p in all:
- if p['user_name'] == user_id and p['access_token'] != except_access_token_id:
+ def remove_pushers_by_access_token(self, user_id, access_tokens):
+ """Remove the pushers for a given user corresponding to a set of
+ access_tokens.
+
+ Args:
+ user_id (str): user to remove pushers for
+ access_tokens (Iterable[int]): access token *ids* to remove pushers
+ for
+ """
+ tokens = set(access_tokens)
+ for p in (yield self.store.get_pushers_by_user_id(user_id)):
+ if p['access_token'] in tokens:
logger.info(
"Removing pusher for app id %s, pushkey %s, user %s",
p['app_id'], p['pushkey'], p['user_name']
)
- yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
+ yield self.remove_pusher(
+ p['app_id'], p['pushkey'], p['user_name'],
+ )
@defer.inlineCallbacks
def on_new_notifications(self, min_stream_id, max_stream_id):
- yield run_on_reactor()
try:
users_affected = yield self.store.get_push_action_users_in_range(
min_stream_id, max_stream_id
@@ -131,18 +135,20 @@ class PusherPool:
if u in self.pushers:
for p in self.pushers[u].values():
deferreds.append(
- preserve_fn(p.on_new_notifications)(
- min_stream_id, max_stream_id
+ run_in_background(
+ p.on_new_notifications,
+ min_stream_id, max_stream_id,
)
)
- yield preserve_context_over_deferred(defer.gatherResults(deferreds))
- except:
+ yield make_deferred_yieldable(
+ defer.gatherResults(deferreds, consumeErrors=True),
+ )
+ except Exception:
logger.exception("Exception in pusher on_new_notifications")
@defer.inlineCallbacks
def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
- yield run_on_reactor()
try:
# Need to subtract 1 from the minimum because the lower bound here
# is not inclusive
@@ -158,11 +164,16 @@ class PusherPool:
if u in self.pushers:
for p in self.pushers[u].values():
deferreds.append(
- preserve_fn(p.on_new_receipts)(min_stream_id, max_stream_id)
+ run_in_background(
+ p.on_new_receipts,
+ min_stream_id, max_stream_id,
+ )
)
- yield preserve_context_over_deferred(defer.gatherResults(deferreds))
- except:
+ yield make_deferred_yieldable(
+ defer.gatherResults(deferreds, consumeErrors=True),
+ )
+ except Exception:
logger.exception("Exception in pusher on_new_receipts")
@defer.inlineCallbacks
@@ -188,7 +199,7 @@ class PusherPool:
for pusherdict in pushers:
try:
p = self.pusher_factory.create_pusher(pusherdict)
- except:
+ except Exception:
logger.exception("Couldn't start a pusher: caught Exception")
continue
if p:
@@ -201,7 +212,7 @@ class PusherPool:
if appid_pushkey in byuser:
byuser[appid_pushkey].on_stop()
byuser[appid_pushkey] = p
- preserve_fn(p.on_started)()
+ run_in_background(p.on_started)
logger.info("Started pushers")
|