diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index e7c964bcd2..8da2d8716c 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
-# Copyright 2015 OpenMarket Ltd
+# Copyright 2015, 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -17,6 +17,8 @@ from twisted.internet import defer
from synapse.streams.config import PaginationConfig
from synapse.types import StreamToken
+from synapse.util.logcontext import LoggingContext
+from synapse.util.metrics import Measure
import synapse.util.async
import push_rule_evaluator as push_rule_evaluator
@@ -27,12 +29,25 @@ import random
logger = logging.getLogger(__name__)
+_NEXT_ID = 1
+
+
+def _get_next_id():
+ global _NEXT_ID
+ _id = _NEXT_ID
+ _NEXT_ID += 1
+ return _id
+
+
+# Pushers could now be moved to pull out of the event_push_actions table instead
+# of listening on the event stream: this would avoid them having to run the
+# rules again.
class Pusher(object):
INITIAL_BACKOFF = 1000
MAX_BACKOFF = 60 * 60 * 1000
GIVE_UP_AFTER = 24 * 60 * 60 * 1000
- def __init__(self, _hs, profile_tag, user_name, app_id,
+ def __init__(self, _hs, profile_tag, user_id, app_id,
app_display_name, device_display_name, pushkey, pushkey_ts,
data, last_token, last_success, failing_since):
self.hs = _hs
@@ -40,7 +55,7 @@ class Pusher(object):
self.store = self.hs.get_datastore()
self.clock = self.hs.get_clock()
self.profile_tag = profile_tag
- self.user_name = user_name
+ self.user_id = user_id
self.app_id = app_id
self.app_display_name = app_display_name
self.device_display_name = device_display_name
@@ -52,6 +67,9 @@ class Pusher(object):
self.backoff_delay = Pusher.INITIAL_BACKOFF
self.failing_since = failing_since
self.alive = True
+ self.badge = None
+
+ self.name = "Pusher-%d" % (_get_next_id(),)
# The last value of last_active_time that we saw
self.last_last_active_time = 0
@@ -82,64 +100,82 @@ class Pusher(object):
@defer.inlineCallbacks
def start(self):
- if not self.last_token:
- # First-time setup: get a token to start from (we can't
- # just start from no token, ie. 'now'
- # because we need the result to be reproduceable in case
- # we fail to dispatch the push)
- config = PaginationConfig(from_token=None, limit='1')
- chunk = yield self.evStreamHandler.get_stream(
- self.user_name, config, timeout=0, affect_presence=False,
- only_room_events=True
- )
- self.last_token = chunk['end']
- self.store.update_pusher_last_token(
- self.app_id, self.pushkey, self.user_name, self.last_token
- )
- logger.info("Pusher %s for user %s starting from token %s",
- self.pushkey, self.user_name, self.last_token)
-
- wait = 0
- while self.alive:
- try:
- if wait > 0:
- yield synapse.util.async.sleep(wait)
- yield self.get_and_dispatch()
- wait = 0
- except:
- if wait == 0:
- wait = 1
- else:
- wait = min(wait * 2, 1800)
- logger.exception(
- "Exception in pusher loop for pushkey %s. Pausing for %ds",
- self.pushkey, wait
+ with LoggingContext(self.name):
+ if not self.last_token:
+ # First-time setup: get a token to start from (we can't
+ # just start from no token, ie. 'now'
+ # because we need the result to be reproduceable in case
+ # we fail to dispatch the push)
+ config = PaginationConfig(from_token=None, limit='1')
+ chunk = yield self.evStreamHandler.get_stream(
+ self.user_id, config, timeout=0, affect_presence=False
+ )
+ self.last_token = chunk['end']
+ yield self.store.update_pusher_last_token(
+ self.app_id, self.pushkey, self.user_id, self.last_token
+ )
+ logger.info("New pusher %s for user %s starting from token %s",
+ self.pushkey, self.user_id, self.last_token)
+
+ else:
+ logger.info(
+ "Old pusher %s for user %s starting",
+ self.pushkey, self.user_id,
)
+ wait = 0
+ while self.alive:
+ try:
+ if wait > 0:
+ yield synapse.util.async.sleep(wait)
+ with Measure(self.clock, "push"):
+ yield self.get_and_dispatch()
+ wait = 0
+ except:
+ if wait == 0:
+ wait = 1
+ else:
+ wait = min(wait * 2, 1800)
+ logger.exception(
+ "Exception in pusher loop for pushkey %s. Pausing for %ds",
+ self.pushkey, wait
+ )
+
@defer.inlineCallbacks
def get_and_dispatch(self):
from_tok = StreamToken.from_string(self.last_token)
config = PaginationConfig(from_token=from_tok, limit='1')
timeout = (300 + random.randint(-60, 60)) * 1000
chunk = yield self.evStreamHandler.get_stream(
- self.user_name, config, timeout=timeout, affect_presence=False,
- only_room_events=True
+ self.user_id, config, timeout=timeout, affect_presence=False,
+ only_keys=("room", "receipt",),
)
# limiting to 1 may get 1 event plus 1 presence event, so
# pick out the actual event
single_event = None
+ read_receipt = None
for c in chunk['chunk']:
if 'event_id' in c: # Hmmm...
single_event = c
- break
+ elif c['type'] == 'm.receipt':
+ read_receipt = c
+
+ have_updated_badge = False
+ if read_receipt:
+ for receipt_part in read_receipt['content'].values():
+ if 'm.read' in receipt_part:
+ if self.user_id in receipt_part['m.read'].keys():
+ have_updated_badge = True
+
if not single_event:
+ if have_updated_badge:
+ yield self.update_badge()
self.last_token = chunk['end']
- logger.debug("Event stream timeout for pushkey %s", self.pushkey)
yield self.store.update_pusher_last_token(
self.app_id,
self.pushkey,
- self.user_name,
+ self.user_id,
self.last_token
)
return
@@ -150,29 +186,16 @@ class Pusher(object):
processed = False
rule_evaluator = yield \
- push_rule_evaluator.evaluator_for_user_name_and_profile_tag(
- self.user_name, self.profile_tag, single_event['room_id'], self.store
+ push_rule_evaluator.evaluator_for_user_id_and_profile_tag(
+ self.user_id, self.profile_tag, single_event['room_id'], self.store
)
actions = yield rule_evaluator.actions_for_event(single_event)
tweaks = rule_evaluator.tweaks_for_actions(actions)
- if len(actions) == 0:
- logger.warn("Empty actions! Using default action.")
- actions = Pusher.DEFAULT_ACTIONS
-
- if 'notify' not in actions and 'dont_notify' not in actions:
- logger.warn("Neither notify nor dont_notify in actions: adding default")
- actions.extend(Pusher.DEFAULT_ACTIONS)
-
- if 'dont_notify' in actions:
- logger.debug(
- "%s for %s: dont_notify",
- single_event['event_id'], self.user_name
- )
- processed = True
- else:
- rejected = yield self.dispatch_push(single_event, tweaks)
+ if 'notify' in actions:
+ self.badge = yield self._get_badge_count()
+ rejected = yield self.dispatch_push(single_event, tweaks, self.badge)
self.has_unread = True
if isinstance(rejected, list) or isinstance(rejected, tuple):
processed = True
@@ -190,8 +213,12 @@ class Pusher(object):
pk
)
yield self.hs.get_pusherpool().remove_pusher(
- self.app_id, pk, self.user_name
+ self.app_id, pk, self.user_id
)
+ else:
+ if have_updated_badge:
+ yield self.update_badge()
+ processed = True
if not self.alive:
return
@@ -202,7 +229,7 @@ class Pusher(object):
yield self.store.update_pusher_last_token_and_success(
self.app_id,
self.pushkey,
- self.user_name,
+ self.user_id,
self.last_token,
self.clock.time_msec()
)
@@ -211,7 +238,7 @@ class Pusher(object):
yield self.store.update_pusher_failing_since(
self.app_id,
self.pushkey,
- self.user_name,
+ self.user_id,
self.failing_since)
else:
if not self.failing_since:
@@ -219,7 +246,7 @@ class Pusher(object):
yield self.store.update_pusher_failing_since(
self.app_id,
self.pushkey,
- self.user_name,
+ self.user_id,
self.failing_since
)
@@ -231,13 +258,13 @@ class Pusher(object):
# of old notifications.
logger.warn("Giving up on a notification to user %s, "
"pushkey %s",
- self.user_name, self.pushkey)
+ self.user_id, self.pushkey)
self.backoff_delay = Pusher.INITIAL_BACKOFF
self.last_token = chunk['end']
yield self.store.update_pusher_last_token(
self.app_id,
self.pushkey,
- self.user_name,
+ self.user_id,
self.last_token
)
@@ -245,14 +272,14 @@ class Pusher(object):
yield self.store.update_pusher_failing_since(
self.app_id,
self.pushkey,
- self.user_name,
+ self.user_id,
self.failing_since
)
else:
logger.warn("Failed to dispatch push for user %s "
"(failing for %dms)."
"Trying again in %dms",
- self.user_name,
+ self.user_id,
self.clock.time_msec() - self.failing_since,
self.backoff_delay)
yield synapse.util.async.sleep(self.backoff_delay / 1000.0)
@@ -263,7 +290,7 @@ class Pusher(object):
def stop(self):
self.alive = False
- def dispatch_push(self, p, tweaks):
+ def dispatch_push(self, p, tweaks, badge):
"""
Overridden by implementing classes to actually deliver the notification
Args:
@@ -275,23 +302,44 @@ class Pusher(object):
"""
pass
- def reset_badge_count(self):
- pass
+ @defer.inlineCallbacks
+ def update_badge(self):
+ new_badge = yield self._get_badge_count()
+ if self.badge != new_badge:
+ self.badge = new_badge
+ yield self.send_badge(self.badge)
- def presence_changed(self, state):
+ def send_badge(self, badge):
"""
- We clear badge counts whenever a user's last_active time is bumped
- This is by no means perfect but I think it's the best we can do
- without read receipts.
+ Overridden by implementing classes to send an updated badge count
"""
- if 'last_active' in state.state:
- last_active = state.state['last_active']
- if last_active > self.last_last_active_time:
- self.last_last_active_time = last_active
- if self.has_unread:
- logger.info("Resetting badge count for %s", self.user_name)
- self.reset_badge_count()
- self.has_unread = False
+ pass
+
+ @defer.inlineCallbacks
+ def _get_badge_count(self):
+ invites, joins = yield defer.gatherResults([
+ self.store.get_invites_for_user(self.user_id),
+ self.store.get_rooms_for_user(self.user_id),
+ ], consumeErrors=True)
+
+ my_receipts_by_room = yield self.store.get_receipts_for_user(
+ self.user_id,
+ "m.read",
+ )
+
+ badge = len(invites)
+
+ for r in joins:
+ if r.room_id in my_receipts_by_room:
+ last_unread_event_id = my_receipts_by_room[r.room_id]
+
+ notifs = yield (
+ self.store.get_unread_event_push_actions_by_room_for_user(
+ r.room_id, self.user_id, last_unread_event_id
+ )
+ )
+ badge += notifs["notify_count"]
+ defer.returnValue(badge)
class PusherConfigException(Exception):
diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py
new file mode 100644
index 0000000000..e0da0868ec
--- /dev/null
+++ b/synapse/push/action_generator.py
@@ -0,0 +1,48 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+import bulk_push_rule_evaluator
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class ActionGenerator:
+ def __init__(self, hs):
+ self.hs = hs
+ self.store = hs.get_datastore()
+ # really we want to get all user ids and all profile tags too,
+ # since we want the actions for each profile tag for every user and
+ # also actions for a client with no profile tag for each user.
+ # Currently the event stream doesn't support profile tags on an
+ # event stream, so we just run the rules for a client with no profile
+ # tag (ie. we just need all the users).
+
+ @defer.inlineCallbacks
+ def handle_push_actions_for_event(self, event, context, handler):
+ bulk_evaluator = yield bulk_push_rule_evaluator.evaluator_for_room_id(
+ event.room_id, self.hs, self.store
+ )
+
+ actions_by_user = yield bulk_evaluator.action_for_event_by_user(
+ event, handler, context.current_state
+ )
+
+ context.push_actions = [
+ (uid, None, actions) for uid, actions in actions_by_user.items()
+ ]
diff --git a/synapse/push/baserules.py b/synapse/push/baserules.py
index 7f76382a17..0832c77cb4 100644
--- a/synapse/push/baserules.py
+++ b/synapse/push/baserules.py
@@ -1,4 +1,4 @@
-# Copyright 2015 OpenMarket Ltd
+# Copyright 2015, 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -15,27 +15,25 @@
from synapse.push.rulekinds import PRIORITY_CLASS_MAP, PRIORITY_CLASS_INVERSE_MAP
-def list_with_base_rules(rawrules, user_name):
+def list_with_base_rules(rawrules):
ruleslist = []
# shove the server default rules for each kind onto the end of each
current_prio_class = PRIORITY_CLASS_INVERSE_MAP.keys()[-1]
ruleslist.extend(make_base_prepend_rules(
- user_name, PRIORITY_CLASS_INVERSE_MAP[current_prio_class]
+ PRIORITY_CLASS_INVERSE_MAP[current_prio_class]
))
for r in rawrules:
if r['priority_class'] < current_prio_class:
while r['priority_class'] < current_prio_class:
ruleslist.extend(make_base_append_rules(
- user_name,
PRIORITY_CLASS_INVERSE_MAP[current_prio_class]
))
current_prio_class -= 1
if current_prio_class > 0:
ruleslist.extend(make_base_prepend_rules(
- user_name,
PRIORITY_CLASS_INVERSE_MAP[current_prio_class]
))
@@ -43,223 +41,240 @@ def list_with_base_rules(rawrules, user_name):
while current_prio_class > 0:
ruleslist.extend(make_base_append_rules(
- user_name,
PRIORITY_CLASS_INVERSE_MAP[current_prio_class]
))
current_prio_class -= 1
if current_prio_class > 0:
ruleslist.extend(make_base_prepend_rules(
- user_name,
PRIORITY_CLASS_INVERSE_MAP[current_prio_class]
))
return ruleslist
-def make_base_append_rules(user, kind):
+def make_base_append_rules(kind):
rules = []
if kind == 'override':
- rules = make_base_append_override_rules()
+ rules = BASE_APPEND_OVRRIDE_RULES
elif kind == 'underride':
- rules = make_base_append_underride_rules(user)
+ rules = BASE_APPEND_UNDERRIDE_RULES
elif kind == 'content':
- rules = make_base_append_content_rules(user)
-
- for r in rules:
- r['priority_class'] = PRIORITY_CLASS_MAP[kind]
- r['default'] = True # Deprecated, left for backwards compat
+ rules = BASE_APPEND_CONTENT_RULES
return rules
-def make_base_prepend_rules(user, kind):
+def make_base_prepend_rules(kind):
rules = []
if kind == 'override':
- rules = make_base_prepend_override_rules()
-
- for r in rules:
- r['priority_class'] = PRIORITY_CLASS_MAP[kind]
- r['default'] = True # Deprecated, left for backwards compat
+ rules = BASE_PREPEND_OVERRIDE_RULES
return rules
-def make_base_append_content_rules(user):
- return [
- {
- 'rule_id': 'global/content/.m.rule.contains_user_name',
- 'conditions': [
- {
- 'kind': 'event_match',
- 'key': 'content.body',
- 'pattern': user.localpart, # Matrix ID match
- }
- ],
- 'actions': [
- 'notify',
- {
- 'set_tweak': 'sound',
- 'value': 'default',
- }, {
- 'set_tweak': 'highlight'
- }
- ]
- },
- ]
+BASE_APPEND_CONTENT_RULES = [
+ {
+ 'rule_id': 'global/content/.m.rule.contains_user_name',
+ 'conditions': [
+ {
+ 'kind': 'event_match',
+ 'key': 'content.body',
+ 'pattern_type': 'user_localpart'
+ }
+ ],
+ 'actions': [
+ 'notify',
+ {
+ 'set_tweak': 'sound',
+ 'value': 'default',
+ }, {
+ 'set_tweak': 'highlight'
+ }
+ ]
+ },
+]
+
+
+BASE_PREPEND_OVERRIDE_RULES = [
+ {
+ 'rule_id': 'global/override/.m.rule.master',
+ 'enabled': False,
+ 'conditions': [],
+ 'actions': [
+ "dont_notify"
+ ]
+ }
+]
+
+
+BASE_APPEND_OVRRIDE_RULES = [
+ {
+ 'rule_id': 'global/override/.m.rule.suppress_notices',
+ 'conditions': [
+ {
+ 'kind': 'event_match',
+ 'key': 'content.msgtype',
+ 'pattern': 'm.notice',
+ '_id': '_suppress_notices',
+ }
+ ],
+ 'actions': [
+ 'dont_notify',
+ ]
+ }
+]
+
+BASE_APPEND_UNDERRIDE_RULES = [
+ {
+ 'rule_id': 'global/underride/.m.rule.call',
+ 'conditions': [
+ {
+ 'kind': 'event_match',
+ 'key': 'type',
+ 'pattern': 'm.call.invite',
+ '_id': '_call',
+ }
+ ],
+ 'actions': [
+ 'notify',
+ {
+ 'set_tweak': 'sound',
+ 'value': 'ring'
+ }, {
+ 'set_tweak': 'highlight',
+ 'value': False
+ }
+ ]
+ },
+ {
+ 'rule_id': 'global/underride/.m.rule.contains_display_name',
+ 'conditions': [
+ {
+ 'kind': 'contains_display_name'
+ }
+ ],
+ 'actions': [
+ 'notify',
+ {
+ 'set_tweak': 'sound',
+ 'value': 'default'
+ }, {
+ 'set_tweak': 'highlight'
+ }
+ ]
+ },
+ {
+ 'rule_id': 'global/underride/.m.rule.room_one_to_one',
+ 'conditions': [
+ {
+ 'kind': 'room_member_count',
+ 'is': '2',
+ '_id': 'member_count',
+ },
+ {
+ 'kind': 'event_match',
+ 'key': 'type',
+ 'pattern': 'm.room.message',
+ '_id': '_message',
+ }
+ ],
+ 'actions': [
+ 'notify',
+ {
+ 'set_tweak': 'sound',
+ 'value': 'default'
+ }, {
+ 'set_tweak': 'highlight',
+ 'value': False
+ }
+ ]
+ },
+ {
+ 'rule_id': 'global/underride/.m.rule.invite_for_me',
+ 'conditions': [
+ {
+ 'kind': 'event_match',
+ 'key': 'type',
+ 'pattern': 'm.room.member',
+ '_id': '_member',
+ },
+ {
+ 'kind': 'event_match',
+ 'key': 'content.membership',
+ 'pattern': 'invite',
+ '_id': '_invite_member',
+ },
+ {
+ 'kind': 'event_match',
+ 'key': 'state_key',
+ 'pattern_type': 'user_id'
+ },
+ ],
+ 'actions': [
+ 'notify',
+ {
+ 'set_tweak': 'sound',
+ 'value': 'default'
+ }, {
+ 'set_tweak': 'highlight',
+ 'value': False
+ }
+ ]
+ },
+ # This is too simple: https://matrix.org/jira/browse/SYN-607
+ # Removing for now
+ # {
+ # 'rule_id': 'global/underride/.m.rule.member_event',
+ # 'conditions': [
+ # {
+ # 'kind': 'event_match',
+ # 'key': 'type',
+ # 'pattern': 'm.room.member',
+ # '_id': '_member',
+ # }
+ # ],
+ # 'actions': [
+ # 'notify', {
+ # 'set_tweak': 'highlight',
+ # 'value': False
+ # }
+ # ]
+ # },
+ {
+ 'rule_id': 'global/underride/.m.rule.message',
+ 'conditions': [
+ {
+ 'kind': 'event_match',
+ 'key': 'type',
+ 'pattern': 'm.room.message',
+ '_id': '_message',
+ }
+ ],
+ 'actions': [
+ 'notify', {
+ 'set_tweak': 'highlight',
+ 'value': False
+ }
+ ]
+ }
+]
-def make_base_prepend_override_rules():
- return [
- {
- 'rule_id': 'global/override/.m.rule.master',
- 'enabled': False,
- 'conditions': [],
- 'actions': [
- "dont_notify"
- ]
- }
- ]
+for r in BASE_APPEND_CONTENT_RULES:
+ r['priority_class'] = PRIORITY_CLASS_MAP['content']
+ r['default'] = True
-def make_base_append_override_rules():
- return [
- {
- 'rule_id': 'global/override/.m.rule.suppress_notices',
- 'conditions': [
- {
- 'kind': 'event_match',
- 'key': 'content.msgtype',
- 'pattern': 'm.notice',
- }
- ],
- 'actions': [
- 'dont_notify',
- ]
- }
- ]
+for r in BASE_PREPEND_OVERRIDE_RULES:
+ r['priority_class'] = PRIORITY_CLASS_MAP['override']
+ r['default'] = True
+for r in BASE_APPEND_OVRRIDE_RULES:
+ r['priority_class'] = PRIORITY_CLASS_MAP['override']
+ r['default'] = True
-def make_base_append_underride_rules(user):
- return [
- {
- 'rule_id': 'global/underride/.m.rule.call',
- 'conditions': [
- {
- 'kind': 'event_match',
- 'key': 'type',
- 'pattern': 'm.call.invite',
- }
- ],
- 'actions': [
- 'notify',
- {
- 'set_tweak': 'sound',
- 'value': 'ring'
- }, {
- 'set_tweak': 'highlight',
- 'value': False
- }
- ]
- },
- {
- 'rule_id': 'global/underride/.m.rule.contains_display_name',
- 'conditions': [
- {
- 'kind': 'contains_display_name'
- }
- ],
- 'actions': [
- 'notify',
- {
- 'set_tweak': 'sound',
- 'value': 'default'
- }, {
- 'set_tweak': 'highlight'
- }
- ]
- },
- {
- 'rule_id': 'global/underride/.m.rule.room_one_to_one',
- 'conditions': [
- {
- 'kind': 'room_member_count',
- 'is': '2'
- }
- ],
- 'actions': [
- 'notify',
- {
- 'set_tweak': 'sound',
- 'value': 'default'
- }, {
- 'set_tweak': 'highlight',
- 'value': False
- }
- ]
- },
- {
- 'rule_id': 'global/underride/.m.rule.invite_for_me',
- 'conditions': [
- {
- 'kind': 'event_match',
- 'key': 'type',
- 'pattern': 'm.room.member',
- },
- {
- 'kind': 'event_match',
- 'key': 'content.membership',
- 'pattern': 'invite',
- },
- {
- 'kind': 'event_match',
- 'key': 'state_key',
- 'pattern': user.to_string(),
- },
- ],
- 'actions': [
- 'notify',
- {
- 'set_tweak': 'sound',
- 'value': 'default'
- }, {
- 'set_tweak': 'highlight',
- 'value': False
- }
- ]
- },
- {
- 'rule_id': 'global/underride/.m.rule.member_event',
- 'conditions': [
- {
- 'kind': 'event_match',
- 'key': 'type',
- 'pattern': 'm.room.member',
- }
- ],
- 'actions': [
- 'notify', {
- 'set_tweak': 'highlight',
- 'value': False
- }
- ]
- },
- {
- 'rule_id': 'global/underride/.m.rule.message',
- 'enabled': False,
- 'conditions': [
- {
- 'kind': 'event_match',
- 'key': 'type',
- 'pattern': 'm.room.message',
- }
- ],
- 'actions': [
- 'notify', {
- 'set_tweak': 'highlight',
- 'value': False
- }
- ]
- }
- ]
+for r in BASE_APPEND_UNDERRIDE_RULES:
+ r['priority_class'] = PRIORITY_CLASS_MAP['underride']
+ r['default'] = True
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
new file mode 100644
index 0000000000..8ac5ceb9ef
--- /dev/null
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -0,0 +1,162 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import ujson as json
+
+from twisted.internet import defer
+
+import baserules
+from push_rule_evaluator import PushRuleEvaluatorForEvent
+
+from synapse.api.constants import EventTypes
+
+
+logger = logging.getLogger(__name__)
+
+
+def decode_rule_json(rule):
+ rule['conditions'] = json.loads(rule['conditions'])
+ rule['actions'] = json.loads(rule['actions'])
+ return rule
+
+
+@defer.inlineCallbacks
+def _get_rules(room_id, user_ids, store):
+ rules_by_user = yield store.bulk_get_push_rules(user_ids)
+ rules_enabled_by_user = yield store.bulk_get_push_rules_enabled(user_ids)
+
+ rules_by_user = {
+ uid: baserules.list_with_base_rules([
+ decode_rule_json(rule_list)
+ for rule_list in rules_by_user.get(uid, [])
+ ])
+ for uid in user_ids
+ }
+
+ # We apply the rules-enabled map here: bulk_get_push_rules doesn't
+ # fetch disabled rules, but this won't account for any server default
+ # rules the user has disabled, so we need to do this too.
+ for uid in user_ids:
+ if uid not in rules_enabled_by_user:
+ continue
+
+ user_enabled_map = rules_enabled_by_user[uid]
+
+ for i, rule in enumerate(rules_by_user[uid]):
+ rule_id = rule['rule_id']
+
+ if rule_id in user_enabled_map:
+ if rule.get('enabled', True) != bool(user_enabled_map[rule_id]):
+ # Rules are cached across users.
+ rule = dict(rule)
+ rule['enabled'] = bool(user_enabled_map[rule_id])
+ rules_by_user[uid][i] = rule
+
+ defer.returnValue(rules_by_user)
+
+
+@defer.inlineCallbacks
+def evaluator_for_room_id(room_id, hs, store):
+ results = yield store.get_receipts_for_room(room_id, "m.read")
+ user_ids = [
+ row["user_id"] for row in results
+ if hs.is_mine_id(row["user_id"])
+ ]
+ rules_by_user = yield _get_rules(room_id, user_ids, store)
+
+ defer.returnValue(BulkPushRuleEvaluator(
+ room_id, rules_by_user, user_ids, store
+ ))
+
+
+class BulkPushRuleEvaluator:
+ """
+ Runs push rules for all users in a room.
+ This is faster than running PushRuleEvaluator for each user because it
+ fetches all the rules for all the users in one (batched) db query
+ rather than doing multiple queries per-user. It currently uses
+ the same logic to run the actual rules, but could be optimised further
+ (see https://matrix.org/jira/browse/SYN-562)
+ """
+ def __init__(self, room_id, rules_by_user, users_in_room, store):
+ self.room_id = room_id
+ self.rules_by_user = rules_by_user
+ self.users_in_room = users_in_room
+ self.store = store
+
+ @defer.inlineCallbacks
+ def action_for_event_by_user(self, event, handler, current_state):
+ actions_by_user = {}
+
+ users_dict = yield self.store.are_guests(self.rules_by_user.keys())
+
+ filtered_by_user = yield handler._filter_events_for_clients(
+ users_dict.items(), [event], {event.event_id: current_state}
+ )
+
+ evaluator = PushRuleEvaluatorForEvent(event, len(self.users_in_room))
+
+ condition_cache = {}
+
+ display_names = {}
+ for ev in current_state.values():
+ nm = ev.content.get("displayname", None)
+ if nm and ev.type == EventTypes.Member:
+ display_names[ev.state_key] = nm
+
+ for uid, rules in self.rules_by_user.items():
+ display_name = display_names.get(uid, None)
+
+ filtered = filtered_by_user[uid]
+ if len(filtered) == 0:
+ continue
+
+ if filtered[0].sender == uid:
+ continue
+
+ for rule in rules:
+ if 'enabled' in rule and not rule['enabled']:
+ continue
+
+ matches = _condition_checker(
+ evaluator, rule['conditions'], uid, display_name, condition_cache
+ )
+ if matches:
+ actions = [x for x in rule['actions'] if x != 'dont_notify']
+ if actions and 'notify' in actions:
+ actions_by_user[uid] = actions
+ break
+ defer.returnValue(actions_by_user)
+
+
+def _condition_checker(evaluator, conditions, uid, display_name, cache):
+ for cond in conditions:
+ _id = cond.get("_id", None)
+ if _id:
+ res = cache.get(_id, None)
+ if res is False:
+ return False
+ elif res is True:
+ continue
+
+ res = evaluator.matches(cond, uid, display_name, None)
+ if _id:
+ cache[_id] = bool(res)
+
+ if not res:
+ return False
+
+ return True
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 5160775e59..cdc4494928 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
-# Copyright 2015 OpenMarket Ltd
+# Copyright 2015, 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -23,13 +23,13 @@ logger = logging.getLogger(__name__)
class HttpPusher(Pusher):
- def __init__(self, _hs, profile_tag, user_name, app_id,
+ def __init__(self, _hs, profile_tag, user_id, app_id,
app_display_name, device_display_name, pushkey, pushkey_ts,
data, last_token, last_success, failing_since):
super(HttpPusher, self).__init__(
_hs,
profile_tag,
- user_name,
+ user_id,
app_id,
app_display_name,
device_display_name,
@@ -51,7 +51,7 @@ class HttpPusher(Pusher):
del self.data_minus_url['url']
@defer.inlineCallbacks
- def _build_notification_dict(self, event, tweaks):
+ def _build_notification_dict(self, event, tweaks, badge):
# we probably do not want to push for every presence update
# (we may want to be able to set up notifications when specific
# people sign in, but we'd want to only deliver the pertinent ones)
@@ -71,7 +71,7 @@ class HttpPusher(Pusher):
'counts': { # -- we don't mark messages as read yet so
# we have no way of knowing
# Just set the badge to 1 until we have read receipts
- 'unread': 1,
+ 'unread': badge,
# 'missed_calls': 2
},
'devices': [
@@ -87,7 +87,7 @@ class HttpPusher(Pusher):
}
if event['type'] == 'm.room.member':
d['notification']['membership'] = event['content']['membership']
- d['notification']['user_is_target'] = event['state_key'] == self.user_name
+ d['notification']['user_is_target'] = event['state_key'] == self.user_id
if 'content' in event:
d['notification']['content'] = event['content']
@@ -101,8 +101,8 @@ class HttpPusher(Pusher):
defer.returnValue(d)
@defer.inlineCallbacks
- def dispatch_push(self, event, tweaks):
- notification_dict = yield self._build_notification_dict(event, tweaks)
+ def dispatch_push(self, event, tweaks, badge):
+ notification_dict = yield self._build_notification_dict(event, tweaks, badge)
if not notification_dict:
defer.returnValue([])
try:
@@ -116,15 +116,15 @@ class HttpPusher(Pusher):
defer.returnValue(rejected)
@defer.inlineCallbacks
- def reset_badge_count(self):
+ def send_badge(self, badge):
+ logger.info("Sending updated badge count %d to %r", badge, self.user_id)
d = {
'notification': {
'id': '',
'type': None,
'sender': '',
'counts': {
- 'unread': 0,
- 'missed_calls': 0
+ 'unread': badge
},
'devices': [
{
diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py
index 92c7fd048f..2a2b4437dc 100644
--- a/synapse/push/push_rule_evaluator.py
+++ b/synapse/push/push_rule_evaluator.py
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
-# Copyright 2015 OpenMarket Ltd
+# Copyright 2015, 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -15,40 +15,71 @@
from twisted.internet import defer
-from synapse.types import UserID
-
import baserules
import logging
import simplejson as json
import re
+from synapse.types import UserID
+from synapse.util.caches.lrucache import LruCache
+
logger = logging.getLogger(__name__)
+GLOB_REGEX = re.compile(r'\\\[(\\\!|)(.*)\\\]')
+IS_GLOB = re.compile(r'[\?\*\[\]]')
+INEQUALITY_EXPR = re.compile("^([=<>]*)([0-9]*)$")
+
+
@defer.inlineCallbacks
-def evaluator_for_user_name_and_profile_tag(user_name, profile_tag, room_id, store):
- rawrules = yield store.get_push_rules_for_user(user_name)
- enabled_map = yield store.get_push_rules_enabled_for_user(user_name)
+def evaluator_for_user_id_and_profile_tag(user_id, profile_tag, room_id, store):
+ rawrules = yield store.get_push_rules_for_user(user_id)
+ enabled_map = yield store.get_push_rules_enabled_for_user(user_id)
our_member_event = yield store.get_current_state(
room_id=room_id,
event_type='m.room.member',
- state_key=user_name,
+ state_key=user_id,
)
defer.returnValue(PushRuleEvaluator(
- user_name, profile_tag, rawrules, enabled_map,
+ user_id, profile_tag, rawrules, enabled_map,
room_id, our_member_event, store
))
+def _room_member_count(ev, condition, room_member_count):
+ if 'is' not in condition:
+ return False
+ m = INEQUALITY_EXPR.match(condition['is'])
+ if not m:
+ return False
+ ineq = m.group(1)
+ rhs = m.group(2)
+ if not rhs.isdigit():
+ return False
+ rhs = int(rhs)
+
+ if ineq == '' or ineq == '==':
+ return room_member_count == rhs
+ elif ineq == '<':
+ return room_member_count < rhs
+ elif ineq == '>':
+ return room_member_count > rhs
+ elif ineq == '>=':
+ return room_member_count >= rhs
+ elif ineq == '<=':
+ return room_member_count <= rhs
+ else:
+ return False
+
+
class PushRuleEvaluator:
- DEFAULT_ACTIONS = ['dont_notify']
- INEQUALITY_EXPR = re.compile("^([=<>]*)([0-9]*)$")
+ DEFAULT_ACTIONS = []
- def __init__(self, user_name, profile_tag, raw_rules, enabled_map, room_id,
+ def __init__(self, user_id, profile_tag, raw_rules, enabled_map, room_id,
our_member_event, store):
- self.user_name = user_name
+ self.user_id = user_id
self.profile_tag = profile_tag
self.room_id = room_id
self.our_member_event = our_member_event
@@ -61,8 +92,7 @@ class PushRuleEvaluator:
rule['actions'] = json.loads(raw_rule['actions'])
rules.append(rule)
- user = UserID.from_string(self.user_name)
- self.rules = baserules.list_with_base_rules(rules, user)
+ self.rules = baserules.list_with_base_rules(rules)
self.enabled_map = enabled_map
@@ -83,9 +113,9 @@ class PushRuleEvaluator:
has configured both globally and per-room when we have the ability
to do such things.
"""
- if ev['user_id'] == self.user_name:
+ if ev['user_id'] == self.user_id:
# let's assume you probably know about messages you sent yourself
- defer.returnValue(['dont_notify'])
+ defer.returnValue([])
room_id = ev['room_id']
@@ -98,127 +128,195 @@ class PushRuleEvaluator:
room_members = yield self.store.get_users_in_room(room_id)
room_member_count = len(room_members)
+ evaluator = PushRuleEvaluatorForEvent(ev, room_member_count)
+
for r in self.rules:
- if r['rule_id'] in self.enabled_map:
- r['enabled'] = self.enabled_map[r['rule_id']]
- elif 'enabled' not in r:
- r['enabled'] = True
- if not r['enabled']:
+ enabled = self.enabled_map.get(r['rule_id'], None)
+ if enabled is not None and not enabled:
+ continue
+
+ if not r.get("enabled", True):
continue
- matches = True
conditions = r['conditions']
actions = r['actions']
- for c in conditions:
- matches &= self._event_fulfills_condition(
- ev, c, display_name=my_display_name,
- room_member_count=room_member_count
- )
- logger.debug(
- "Rule %s %s",
- r['rule_id'], "matches" if matches else "doesn't match"
- )
# ignore rules with no actions (we have an explict 'dont_notify')
if len(actions) == 0:
logger.warn(
"Ignoring rule id %s with no actions for user %s",
- r['rule_id'], self.user_name
+ r['rule_id'], self.user_id
)
continue
+
+ matches = True
+ for c in conditions:
+ matches = evaluator.matches(
+ c, self.user_id, my_display_name, self.profile_tag
+ )
+ if not matches:
+ break
+
+ logger.debug(
+ "Rule %s %s",
+ r['rule_id'], "matches" if matches else "doesn't match"
+ )
+
if matches:
- logger.info(
+ logger.debug(
"%s matches for user %s, event %s",
- r['rule_id'], self.user_name, ev['event_id']
+ r['rule_id'], self.user_id, ev['event_id']
)
+
+ # filter out dont_notify as we treat an empty actions list
+ # as dont_notify, and this doesn't take up a row in our database
+ actions = [x for x in actions if x != 'dont_notify']
+
defer.returnValue(actions)
- logger.info(
+ logger.debug(
"No rules match for user %s, event %s",
- self.user_name, ev['event_id']
+ self.user_id, ev['event_id']
)
defer.returnValue(PushRuleEvaluator.DEFAULT_ACTIONS)
- @staticmethod
- def _glob_to_regexp(glob):
- r = re.escape(glob)
- r = re.sub(r'\\\*', r'.*?', r)
- r = re.sub(r'\\\?', r'.', r)
-
- # handle [abc], [a-z] and [!a-z] style ranges.
- r = re.sub(r'\\\[(\\\!|)(.*)\\\]',
- lambda x: ('[%s%s]' % (x.group(1) and '^' or '',
- re.sub(r'\\\-', '-', x.group(2)))), r)
- return r
- def _event_fulfills_condition(self, ev, condition, display_name, room_member_count):
- if condition['kind'] == 'event_match':
- if 'pattern' not in condition:
- logger.warn("event_match condition with no pattern")
- return False
- # XXX: optimisation: cache our pattern regexps
- if condition['key'] == 'content.body':
- r = r'\b%s\b' % self._glob_to_regexp(condition['pattern'])
- else:
- r = r'^%s$' % self._glob_to_regexp(condition['pattern'])
- val = _value_for_dotted_key(condition['key'], ev)
- if val is None:
- return False
- return re.search(r, val, flags=re.IGNORECASE) is not None
+class PushRuleEvaluatorForEvent(object):
+ def __init__(self, event, room_member_count):
+ self._event = event
+ self._room_member_count = room_member_count
+ # Maps strings of e.g. 'content.body' -> event["content"]["body"]
+ self._value_cache = _flatten_dict(event)
+
+ def matches(self, condition, user_id, display_name, profile_tag):
+ if condition['kind'] == 'event_match':
+ return self._event_match(condition, user_id)
elif condition['kind'] == 'device':
if 'profile_tag' not in condition:
return True
- return condition['profile_tag'] == self.profile_tag
-
+ return condition['profile_tag'] == profile_tag
elif condition['kind'] == 'contains_display_name':
- # This is special because display names can be different
- # between rooms and so you can't really hard code it in a rule.
- # Optimisation: we should cache these names and update them from
- # the event stream.
- if 'content' not in ev or 'body' not in ev['content']:
- return False
- if not display_name:
- return False
- return re.search(
- r"\b%s\b" % re.escape(display_name), ev['content']['body'],
- flags=re.IGNORECASE
- ) is not None
-
+ return self._contains_display_name(display_name)
elif condition['kind'] == 'room_member_count':
- if 'is' not in condition:
- return False
- m = PushRuleEvaluator.INEQUALITY_EXPR.match(condition['is'])
- if not m:
+ return _room_member_count(
+ self._event, condition, self._room_member_count
+ )
+ else:
+ return True
+
+ def _event_match(self, condition, user_id):
+ pattern = condition.get('pattern', None)
+
+ if not pattern:
+ pattern_type = condition.get('pattern_type', None)
+ if pattern_type == "user_id":
+ pattern = user_id
+ elif pattern_type == "user_localpart":
+ pattern = UserID.from_string(user_id).localpart
+
+ if not pattern:
+ logger.warn("event_match condition with no pattern")
+ return False
+
+ # XXX: optimisation: cache our pattern regexps
+ if condition['key'] == 'content.body':
+ body = self._event["content"].get("body", None)
+ if not body:
return False
- ineq = m.group(1)
- rhs = m.group(2)
- if not rhs.isdigit():
+
+ return _glob_matches(pattern, body, word_boundary=True)
+ else:
+ haystack = self._get_value(condition['key'])
+ if haystack is None:
return False
- rhs = int(rhs)
-
- if ineq == '' or ineq == '==':
- return room_member_count == rhs
- elif ineq == '<':
- return room_member_count < rhs
- elif ineq == '>':
- return room_member_count > rhs
- elif ineq == '>=':
- return room_member_count >= rhs
- elif ineq == '<=':
- return room_member_count <= rhs
+
+ return _glob_matches(pattern, haystack)
+
+ def _contains_display_name(self, display_name):
+ if not display_name:
+ return False
+
+ body = self._event["content"].get("body", None)
+ if not body:
+ return False
+
+ return _glob_matches(display_name, body, word_boundary=True)
+
+ def _get_value(self, dotted_key):
+ return self._value_cache.get(dotted_key, None)
+
+
+def _glob_matches(glob, value, word_boundary=False):
+ """Tests if value matches glob.
+
+ Args:
+ glob (string)
+ value (string): String to test against glob.
+ word_boundary (bool): Whether to match against word boundaries or entire
+ string. Defaults to False.
+
+ Returns:
+ bool
+ """
+ try:
+ if IS_GLOB.search(glob):
+ r = re.escape(glob)
+
+ r = r.replace(r'\*', '.*?')
+ r = r.replace(r'\?', '.')
+
+ # handle [abc], [a-z] and [!a-z] style ranges.
+ r = GLOB_REGEX.sub(
+ lambda x: (
+ '[%s%s]' % (
+ x.group(1) and '^' or '',
+ x.group(2).replace(r'\\\-', '-')
+ )
+ ),
+ r,
+ )
+ if word_boundary:
+ r = r"\b%s\b" % (r,)
+ r = _compile_regex(r)
+
+ return r.search(value)
else:
- return False
+ r = r + "$"
+ r = _compile_regex(r)
+
+ return r.match(value)
+ elif word_boundary:
+ r = re.escape(glob)
+ r = r"\b%s\b" % (r,)
+ r = _compile_regex(r)
+
+ return r.search(value)
else:
- return True
+ return value.lower() == glob.lower()
+ except re.error:
+ logger.warn("Failed to parse glob to regex: %r", glob)
+ return False
+
+
+def _flatten_dict(d, prefix=[], result={}):
+ for key, value in d.items():
+ if isinstance(value, basestring):
+ result[".".join(prefix + [key])] = value.lower()
+ elif hasattr(value, "items"):
+ _flatten_dict(value, prefix=(prefix + [key]), result=result)
+
+ return result
-def _value_for_dotted_key(dotted_key, event):
- parts = dotted_key.split(".")
- val = event
- while len(parts) > 0:
- if parts[0] not in val:
- return None
- val = val[parts[0]]
- parts = parts[1:]
- return val
+regex_cache = LruCache(5000)
+
+
+def _compile_regex(regex_str):
+ r = regex_cache.get(regex_str, None)
+ if r:
+ return r
+
+ r = re.compile(regex_str, flags=re.IGNORECASE)
+ regex_cache[regex_str] = r
+ return r
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index e012c565ee..d7dcb2de4b 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
-# Copyright 2015 OpenMarket Ltd
+# Copyright 2015, 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@ from twisted.internet import defer
from httppusher import HttpPusher
from synapse.push import PusherConfigException
+from synapse.util.logcontext import preserve_fn
import logging
@@ -31,35 +32,20 @@ class PusherPool:
self.pushers = {}
self.last_pusher_started = -1
- distributor = self.hs.get_distributor()
- distributor.observe(
- "user_presence_changed", self.user_presence_changed
- )
-
- @defer.inlineCallbacks
- def user_presence_changed(self, user, state):
- user_name = user.to_string()
-
- # until we have read receipts, pushers use this to reset a user's
- # badge counters to zero
- for p in self.pushers.values():
- if p.user_name == user_name:
- yield p.presence_changed(state)
-
@defer.inlineCallbacks
def start(self):
pushers = yield self.store.get_all_pushers()
self._start_pushers(pushers)
@defer.inlineCallbacks
- def add_pusher(self, user_name, access_token, profile_tag, kind, app_id,
+ def add_pusher(self, user_id, access_token, profile_tag, kind, app_id,
app_display_name, device_display_name, pushkey, lang, data):
# we try to create the pusher just to validate the config: it
# will then get pulled out of the database,
# recreated, added and started: this means we have only one
# code path adding pushers.
self._create_pusher({
- "user_name": user_name,
+ "user_name": user_id,
"kind": kind,
"profile_tag": profile_tag,
"app_id": app_id,
@@ -74,7 +60,7 @@ class PusherPool:
"failing_since": None
})
yield self._add_pusher_to_store(
- user_name, access_token, profile_tag, kind, app_id,
+ user_id, access_token, profile_tag, kind, app_id,
app_display_name, device_display_name,
pushkey, lang, data
)
@@ -91,7 +77,7 @@ class PusherPool:
"Removing pusher for app id %s, pushkey %s, user %s",
app_id, pushkey, p['user_name']
)
- self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
+ yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
@defer.inlineCallbacks
def remove_pushers_by_user(self, user_id):
@@ -106,14 +92,14 @@ class PusherPool:
"Removing pusher for app id %s, pushkey %s, user %s",
p['app_id'], p['pushkey'], p['user_name']
)
- self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
+ yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
@defer.inlineCallbacks
- def _add_pusher_to_store(self, user_name, access_token, profile_tag, kind,
+ def _add_pusher_to_store(self, user_id, access_token, profile_tag, kind,
app_id, app_display_name, device_display_name,
pushkey, lang, data):
yield self.store.add_pusher(
- user_name=user_name,
+ user_id=user_id,
access_token=access_token,
profile_tag=profile_tag,
kind=kind,
@@ -125,14 +111,14 @@ class PusherPool:
lang=lang,
data=data,
)
- self._refresh_pusher(app_id, pushkey, user_name)
+ yield self._refresh_pusher(app_id, pushkey, user_id)
def _create_pusher(self, pusherdict):
if pusherdict['kind'] == 'http':
return HttpPusher(
self.hs,
profile_tag=pusherdict['profile_tag'],
- user_name=pusherdict['user_name'],
+ user_id=pusherdict['user_name'],
app_id=pusherdict['app_id'],
app_display_name=pusherdict['app_display_name'],
device_display_name=pusherdict['device_display_name'],
@@ -150,14 +136,14 @@ class PusherPool:
)
@defer.inlineCallbacks
- def _refresh_pusher(self, app_id, pushkey, user_name):
+ def _refresh_pusher(self, app_id, pushkey, user_id):
resultlist = yield self.store.get_pushers_by_app_id_and_pushkey(
app_id, pushkey
)
p = None
for r in resultlist:
- if r['user_name'] == user_name:
+ if r['user_name'] == user_id:
p = r
if p:
@@ -181,17 +167,17 @@ class PusherPool:
if fullid in self.pushers:
self.pushers[fullid].stop()
self.pushers[fullid] = p
- p.start()
+ preserve_fn(p.start)()
logger.info("Started pushers")
@defer.inlineCallbacks
- def remove_pusher(self, app_id, pushkey, user_name):
- fullid = "%s:%s:%s" % (app_id, pushkey, user_name)
+ def remove_pusher(self, app_id, pushkey, user_id):
+ fullid = "%s:%s:%s" % (app_id, pushkey, user_id)
if fullid in self.pushers:
logger.info("Stopping pusher %s", fullid)
self.pushers[fullid].stop()
del self.pushers[fullid]
- yield self.store.delete_pusher_by_app_id_pushkey_user_name(
- app_id, pushkey, user_name
+ yield self.store.delete_pusher_by_app_id_pushkey_user_id(
+ app_id, pushkey, user_id
)
diff --git a/synapse/push/rulekinds.py b/synapse/push/rulekinds.py
index 4c591aa638..4cae48ac07 100644
--- a/synapse/push/rulekinds.py
+++ b/synapse/push/rulekinds.py
@@ -1,4 +1,4 @@
-# Copyright 2015 OpenMarket Ltd
+# Copyright 2015, 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
|