diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index e6a28bd8c0..65ef1b68a3 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -17,10 +17,11 @@ from twisted.internet import defer
from synapse.streams.config import PaginationConfig
from synapse.types import StreamToken
-from synapse.api.constants import Membership
+from synapse.util.logcontext import LoggingContext
+from synapse.util.metrics import Measure
import synapse.util.async
-import push_rule_evaluator as push_rule_evaluator
+from .push_rule_evaluator import evaluator_for_user_id
import logging
import random
@@ -28,6 +29,16 @@ import random
logger = logging.getLogger(__name__)
+_NEXT_ID = 1
+
+
+def _get_next_id():
+ global _NEXT_ID
+ _id = _NEXT_ID
+ _NEXT_ID += 1
+ return _id
+
+
# Pushers could now be moved to pull out of the event_push_actions table instead
# of listening on the event stream: this would avoid them having to run the
# rules again.
@@ -36,14 +47,13 @@ class Pusher(object):
MAX_BACKOFF = 60 * 60 * 1000
GIVE_UP_AFTER = 24 * 60 * 60 * 1000
- def __init__(self, _hs, profile_tag, user_id, app_id,
+ def __init__(self, _hs, user_id, app_id,
app_display_name, device_display_name, pushkey, pushkey_ts,
data, last_token, last_success, failing_since):
self.hs = _hs
self.evStreamHandler = self.hs.get_handlers().event_stream_handler
self.store = self.hs.get_datastore()
self.clock = self.hs.get_clock()
- self.profile_tag = profile_tag
self.user_id = user_id
self.app_id = app_id
self.app_display_name = app_display_name
@@ -58,6 +68,8 @@ class Pusher(object):
self.alive = True
self.badge = None
+ self.name = "Pusher-%d" % (_get_next_id(),)
+
# The last value of last_active_time that we saw
self.last_last_active_time = 0
self.has_unread = True
@@ -87,38 +99,46 @@ class Pusher(object):
@defer.inlineCallbacks
def start(self):
- if not self.last_token:
- # First-time setup: get a token to start from (we can't
- # just start from no token, ie. 'now'
- # because we need the result to be reproduceable in case
- # we fail to dispatch the push)
- config = PaginationConfig(from_token=None, limit='1')
- chunk = yield self.evStreamHandler.get_stream(
- self.user_id, config, timeout=0, affect_presence=False
- )
- self.last_token = chunk['end']
- self.store.update_pusher_last_token(
- self.app_id, self.pushkey, self.user_id, self.last_token
- )
- logger.info("Pusher %s for user %s starting from token %s",
- self.pushkey, self.user_id, self.last_token)
-
- wait = 0
- while self.alive:
- try:
- if wait > 0:
- yield synapse.util.async.sleep(wait)
- yield self.get_and_dispatch()
- wait = 0
- except:
- if wait == 0:
- wait = 1
- else:
- wait = min(wait * 2, 1800)
- logger.exception(
- "Exception in pusher loop for pushkey %s. Pausing for %ds",
- self.pushkey, wait
+ with LoggingContext(self.name):
+ if not self.last_token:
+ # First-time setup: get a token to start from (we can't
+ # just start from no token, ie. 'now'
+ # because we need the result to be reproduceable in case
+ # we fail to dispatch the push)
+ config = PaginationConfig(from_token=None, limit='1')
+ chunk = yield self.evStreamHandler.get_stream(
+ self.user_id, config, timeout=0, affect_presence=False
+ )
+ self.last_token = chunk['end']
+ yield self.store.update_pusher_last_token(
+ self.app_id, self.pushkey, self.user_id, self.last_token
)
+ logger.info("New pusher %s for user %s starting from token %s",
+ self.pushkey, self.user_id, self.last_token)
+
+ else:
+ logger.info(
+ "Old pusher %s for user %s starting",
+ self.pushkey, self.user_id,
+ )
+
+ wait = 0
+ while self.alive:
+ try:
+ if wait > 0:
+ yield synapse.util.async.sleep(wait)
+ with Measure(self.clock, "push"):
+ yield self.get_and_dispatch()
+ wait = 0
+ except:
+ if wait == 0:
+ wait = 1
+ else:
+ wait = min(wait * 2, 1800)
+ logger.exception(
+ "Exception in pusher loop for pushkey %s. Pausing for %ds",
+ self.pushkey, wait
+ )
@defer.inlineCallbacks
def get_and_dispatch(self):
@@ -165,8 +185,8 @@ class Pusher(object):
processed = False
rule_evaluator = yield \
- push_rule_evaluator.evaluator_for_user_id_and_profile_tag(
- self.user_id, self.profile_tag, single_event['room_id'], self.store
+ evaluator_for_user_id(
+ self.user_id, single_event['room_id'], self.store
)
actions = yield rule_evaluator.actions_for_event(single_event)
@@ -296,31 +316,28 @@ class Pusher(object):
@defer.inlineCallbacks
def _get_badge_count(self):
- room_list = yield self.store.get_rooms_for_user_where_membership_is(
- user_id=self.user_id,
- membership_list=(Membership.INVITE, Membership.JOIN)
- )
+ invites, joins = yield defer.gatherResults([
+ self.store.get_invites_for_user(self.user_id),
+ self.store.get_rooms_for_user(self.user_id),
+ ], consumeErrors=True)
my_receipts_by_room = yield self.store.get_receipts_for_user(
self.user_id,
"m.read",
)
- badge = 0
+ badge = len(invites)
- for r in room_list:
- if r.membership == Membership.INVITE:
- badge += 1
- else:
- if r.room_id in my_receipts_by_room:
- last_unread_event_id = my_receipts_by_room[r.room_id]
+ for r in joins:
+ if r.room_id in my_receipts_by_room:
+ last_unread_event_id = my_receipts_by_room[r.room_id]
- notifs = yield (
- self.store.get_unread_event_push_actions_by_room_for_user(
- r.room_id, self.user_id, last_unread_event_id
- )
+ notifs = yield (
+ self.store.get_unread_event_push_actions_by_room_for_user(
+ r.room_id, self.user_id, last_unread_event_id
)
- badge += len(notifs)
+ )
+ badge += notifs["notify_count"]
defer.returnValue(badge)
diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py
index 1d2e558f9a..84efcdd184 100644
--- a/synapse/push/action_generator.py
+++ b/synapse/push/action_generator.py
@@ -15,12 +15,10 @@
from twisted.internet import defer
-import bulk_push_rule_evaluator
+from .bulk_push_rule_evaluator import evaluator_for_room_id
import logging
-from synapse.api.constants import EventTypes
-
logger = logging.getLogger(__name__)
@@ -36,21 +34,15 @@ class ActionGenerator:
# tag (ie. we just need all the users).
@defer.inlineCallbacks
- def handle_push_actions_for_event(self, event, handler):
- if event.type == EventTypes.Redaction and event.redacts is not None:
- yield self.store.remove_push_actions_for_event_id(
- event.room_id, event.redacts
- )
-
- bulk_evaluator = yield bulk_push_rule_evaluator.evaluator_for_room_id(
+ def handle_push_actions_for_event(self, event, context, handler):
+ bulk_evaluator = yield evaluator_for_room_id(
event.room_id, self.hs, self.store
)
- actions_by_user = yield bulk_evaluator.action_for_event_by_user(event, handler)
-
- yield self.store.set_push_actions_for_event_and_users(
- event,
- [
- (uid, None, actions) for uid, actions in actions_by_user.items()
- ]
+ actions_by_user = yield bulk_evaluator.action_for_event_by_user(
+ event, handler, context.current_state
)
+
+ context.push_actions = [
+ (uid, actions) for uid, actions in actions_by_user.items()
+ ]
diff --git a/synapse/push/baserules.py b/synapse/push/baserules.py
index 186281dfa3..86a2998bcc 100644
--- a/synapse/push/baserules.py
+++ b/synapse/push/baserules.py
@@ -13,46 +13,67 @@
# limitations under the License.
from synapse.push.rulekinds import PRIORITY_CLASS_MAP, PRIORITY_CLASS_INVERSE_MAP
+import copy
def list_with_base_rules(rawrules):
+ """Combine the list of rules set by the user with the default push rules
+
+ :param list rawrules: The rules the user has modified or set.
+ :returns: A new list with the rules set by the user combined with the
+ defaults.
+ """
ruleslist = []
+ # Grab the base rules that the user has modified.
+ # The modified base rules have a priority_class of -1.
+ modified_base_rules = {
+ r['rule_id']: r for r in rawrules if r['priority_class'] < 0
+ }
+
+ # Remove the modified base rules from the list, They'll be added back
+ # in the default postions in the list.
+ rawrules = [r for r in rawrules if r['priority_class'] >= 0]
+
# shove the server default rules for each kind onto the end of each
current_prio_class = PRIORITY_CLASS_INVERSE_MAP.keys()[-1]
ruleslist.extend(make_base_prepend_rules(
- PRIORITY_CLASS_INVERSE_MAP[current_prio_class]
+ PRIORITY_CLASS_INVERSE_MAP[current_prio_class], modified_base_rules
))
for r in rawrules:
if r['priority_class'] < current_prio_class:
while r['priority_class'] < current_prio_class:
ruleslist.extend(make_base_append_rules(
- PRIORITY_CLASS_INVERSE_MAP[current_prio_class]
+ PRIORITY_CLASS_INVERSE_MAP[current_prio_class],
+ modified_base_rules,
))
current_prio_class -= 1
if current_prio_class > 0:
ruleslist.extend(make_base_prepend_rules(
- PRIORITY_CLASS_INVERSE_MAP[current_prio_class]
+ PRIORITY_CLASS_INVERSE_MAP[current_prio_class],
+ modified_base_rules,
))
ruleslist.append(r)
while current_prio_class > 0:
ruleslist.extend(make_base_append_rules(
- PRIORITY_CLASS_INVERSE_MAP[current_prio_class]
+ PRIORITY_CLASS_INVERSE_MAP[current_prio_class],
+ modified_base_rules,
))
current_prio_class -= 1
if current_prio_class > 0:
ruleslist.extend(make_base_prepend_rules(
- PRIORITY_CLASS_INVERSE_MAP[current_prio_class]
+ PRIORITY_CLASS_INVERSE_MAP[current_prio_class],
+ modified_base_rules,
))
return ruleslist
-def make_base_append_rules(kind):
+def make_base_append_rules(kind, modified_base_rules):
rules = []
if kind == 'override':
@@ -62,15 +83,31 @@ def make_base_append_rules(kind):
elif kind == 'content':
rules = BASE_APPEND_CONTENT_RULES
+ # Copy the rules before modifying them
+ rules = copy.deepcopy(rules)
+ for r in rules:
+ # Only modify the actions, keep the conditions the same.
+ modified = modified_base_rules.get(r['rule_id'])
+ if modified:
+ r['actions'] = modified['actions']
+
return rules
-def make_base_prepend_rules(kind):
+def make_base_prepend_rules(kind, modified_base_rules):
rules = []
if kind == 'override':
rules = BASE_PREPEND_OVERRIDE_RULES
+ # Copy the rules before modifying them
+ rules = copy.deepcopy(rules)
+ for r in rules:
+ # Only modify the actions, keep the conditions the same.
+ modified = modified_base_rules.get(r['rule_id'])
+ if modified:
+ r['actions'] = modified['actions']
+
return rules
@@ -173,6 +210,12 @@ BASE_APPEND_UNDERRIDE_RULES = [
'kind': 'room_member_count',
'is': '2',
'_id': 'member_count',
+ },
+ {
+ 'kind': 'event_match',
+ 'key': 'type',
+ 'pattern': 'm.room.message',
+ '_id': '_message',
}
],
'actions': [
@@ -257,18 +300,24 @@ BASE_APPEND_UNDERRIDE_RULES = [
]
+BASE_RULE_IDS = set()
+
for r in BASE_APPEND_CONTENT_RULES:
r['priority_class'] = PRIORITY_CLASS_MAP['content']
r['default'] = True
+ BASE_RULE_IDS.add(r['rule_id'])
for r in BASE_PREPEND_OVERRIDE_RULES:
r['priority_class'] = PRIORITY_CLASS_MAP['override']
r['default'] = True
+ BASE_RULE_IDS.add(r['rule_id'])
for r in BASE_APPEND_OVRRIDE_RULES:
r['priority_class'] = PRIORITY_CLASS_MAP['override']
r['default'] = True
+ BASE_RULE_IDS.add(r['rule_id'])
for r in BASE_APPEND_UNDERRIDE_RULES:
r['priority_class'] = PRIORITY_CLASS_MAP['underride']
r['default'] = True
+ BASE_RULE_IDS.add(r['rule_id'])
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 20c60422bf..87d5061fb0 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -18,8 +18,8 @@ import ujson as json
from twisted.internet import defer
-import baserules
-from push_rule_evaluator import PushRuleEvaluatorForEvent
+from .baserules import list_with_base_rules
+from .push_rule_evaluator import PushRuleEvaluatorForEvent
from synapse.api.constants import EventTypes
@@ -39,7 +39,7 @@ def _get_rules(room_id, user_ids, store):
rules_enabled_by_user = yield store.bulk_get_push_rules_enabled(user_ids)
rules_by_user = {
- uid: baserules.list_with_base_rules([
+ uid: list_with_base_rules([
decode_rule_json(rule_list)
for rule_list in rules_by_user.get(uid, [])
])
@@ -98,25 +98,21 @@ class BulkPushRuleEvaluator:
self.store = store
@defer.inlineCallbacks
- def action_for_event_by_user(self, event, handler):
+ def action_for_event_by_user(self, event, handler, current_state):
actions_by_user = {}
users_dict = yield self.store.are_guests(self.rules_by_user.keys())
- filtered_by_user = yield handler._filter_events_for_clients(
- users_dict.items(), [event]
+ filtered_by_user = yield handler.filter_events_for_clients(
+ users_dict.items(), [event], {event.event_id: current_state}
)
evaluator = PushRuleEvaluatorForEvent(event, len(self.users_in_room))
condition_cache = {}
- member_state = yield self.store.get_state_for_event(
- event.event_id,
- )
-
display_names = {}
- for ev in member_state.values():
+ for ev in current_state.values():
nm = ev.content.get("displayname", None)
if nm and ev.type == EventTypes.Member:
display_names[ev.state_key] = nm
@@ -156,7 +152,7 @@ def _condition_checker(evaluator, conditions, uid, display_name, cache):
elif res is True:
continue
- res = evaluator.matches(cond, uid, display_name, None)
+ res = evaluator.matches(cond, uid, display_name)
if _id:
cache[_id] = bool(res)
diff --git a/synapse/push/clientformat.py b/synapse/push/clientformat.py
new file mode 100644
index 0000000000..ae9db9ec2f
--- /dev/null
+++ b/synapse/push/clientformat.py
@@ -0,0 +1,112 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.push.baserules import list_with_base_rules
+
+from synapse.push.rulekinds import (
+ PRIORITY_CLASS_MAP, PRIORITY_CLASS_INVERSE_MAP
+)
+
+import copy
+import simplejson as json
+
+
+def format_push_rules_for_user(user, rawrules, enabled_map):
+ """Converts a list of rawrules and a enabled map into nested dictionaries
+ to match the Matrix client-server format for push rules"""
+
+ ruleslist = []
+ for rawrule in rawrules:
+ rule = dict(rawrule)
+ rule["conditions"] = json.loads(rawrule["conditions"])
+ rule["actions"] = json.loads(rawrule["actions"])
+ ruleslist.append(rule)
+
+ # We're going to be mutating this a lot, so do a deep copy
+ ruleslist = copy.deepcopy(list_with_base_rules(ruleslist))
+
+ rules = {'global': {}, 'device': {}}
+
+ rules['global'] = _add_empty_priority_class_arrays(rules['global'])
+
+ for r in ruleslist:
+ rulearray = None
+
+ template_name = _priority_class_to_template_name(r['priority_class'])
+
+ # Remove internal stuff.
+ for c in r["conditions"]:
+ c.pop("_id", None)
+
+ pattern_type = c.pop("pattern_type", None)
+ if pattern_type == "user_id":
+ c["pattern"] = user.to_string()
+ elif pattern_type == "user_localpart":
+ c["pattern"] = user.localpart
+
+ rulearray = rules['global'][template_name]
+
+ template_rule = _rule_to_template(r)
+ if template_rule:
+ if r['rule_id'] in enabled_map:
+ template_rule['enabled'] = enabled_map[r['rule_id']]
+ elif 'enabled' in r:
+ template_rule['enabled'] = r['enabled']
+ else:
+ template_rule['enabled'] = True
+ rulearray.append(template_rule)
+
+ return rules
+
+
+def _add_empty_priority_class_arrays(d):
+ for pc in PRIORITY_CLASS_MAP.keys():
+ d[pc] = []
+ return d
+
+
+def _rule_to_template(rule):
+ unscoped_rule_id = None
+ if 'rule_id' in rule:
+ unscoped_rule_id = _rule_id_from_namespaced(rule['rule_id'])
+
+ template_name = _priority_class_to_template_name(rule['priority_class'])
+ if template_name in ['override', 'underride']:
+ templaterule = {k: rule[k] for k in ["conditions", "actions"]}
+ elif template_name in ["sender", "room"]:
+ templaterule = {'actions': rule['actions']}
+ unscoped_rule_id = rule['conditions'][0]['pattern']
+ elif template_name == 'content':
+ if len(rule["conditions"]) != 1:
+ return None
+ thecond = rule["conditions"][0]
+ if "pattern" not in thecond:
+ return None
+ templaterule = {'actions': rule['actions']}
+ templaterule["pattern"] = thecond["pattern"]
+
+ if unscoped_rule_id:
+ templaterule['rule_id'] = unscoped_rule_id
+ if 'default' in rule:
+ templaterule['default'] = rule['default']
+ return templaterule
+
+
+def _rule_id_from_namespaced(in_rule_id):
+ return in_rule_id.split('/')[-1]
+
+
+def _priority_class_to_template_name(pc):
+ return PRIORITY_CLASS_INVERSE_MAP[pc]
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index cdc4494928..9be4869360 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -23,12 +23,11 @@ logger = logging.getLogger(__name__)
class HttpPusher(Pusher):
- def __init__(self, _hs, profile_tag, user_id, app_id,
+ def __init__(self, _hs, user_id, app_id,
app_display_name, device_display_name, pushkey, pushkey_ts,
data, last_token, last_success, failing_since):
super(HttpPusher, self).__init__(
_hs,
- profile_tag,
user_id,
app_id,
app_display_name,
diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py
index dca018af95..51f73a5b78 100644
--- a/synapse/push/push_rule_evaluator.py
+++ b/synapse/push/push_rule_evaluator.py
@@ -15,7 +15,7 @@
from twisted.internet import defer
-import baserules
+from .baserules import list_with_base_rules
import logging
import simplejson as json
@@ -33,7 +33,7 @@ INEQUALITY_EXPR = re.compile("^([=<>]*)([0-9]*)$")
@defer.inlineCallbacks
-def evaluator_for_user_id_and_profile_tag(user_id, profile_tag, room_id, store):
+def evaluator_for_user_id(user_id, room_id, store):
rawrules = yield store.get_push_rules_for_user(user_id)
enabled_map = yield store.get_push_rules_enabled_for_user(user_id)
our_member_event = yield store.get_current_state(
@@ -43,7 +43,7 @@ def evaluator_for_user_id_and_profile_tag(user_id, profile_tag, room_id, store):
)
defer.returnValue(PushRuleEvaluator(
- user_id, profile_tag, rawrules, enabled_map,
+ user_id, rawrules, enabled_map,
room_id, our_member_event, store
))
@@ -77,10 +77,9 @@ def _room_member_count(ev, condition, room_member_count):
class PushRuleEvaluator:
DEFAULT_ACTIONS = []
- def __init__(self, user_id, profile_tag, raw_rules, enabled_map, room_id,
+ def __init__(self, user_id, raw_rules, enabled_map, room_id,
our_member_event, store):
self.user_id = user_id
- self.profile_tag = profile_tag
self.room_id = room_id
self.our_member_event = our_member_event
self.store = store
@@ -92,7 +91,7 @@ class PushRuleEvaluator:
rule['actions'] = json.loads(raw_rule['actions'])
rules.append(rule)
- self.rules = baserules.list_with_base_rules(rules)
+ self.rules = list_with_base_rules(rules)
self.enabled_map = enabled_map
@@ -152,7 +151,7 @@ class PushRuleEvaluator:
matches = True
for c in conditions:
matches = evaluator.matches(
- c, self.user_id, my_display_name, self.profile_tag
+ c, self.user_id, my_display_name
)
if not matches:
break
@@ -189,13 +188,9 @@ class PushRuleEvaluatorForEvent(object):
# Maps strings of e.g. 'content.body' -> event["content"]["body"]
self._value_cache = _flatten_dict(event)
- def matches(self, condition, user_id, display_name, profile_tag):
+ def matches(self, condition, user_id, display_name):
if condition['kind'] == 'event_match':
return self._event_match(condition, user_id)
- elif condition['kind'] == 'device':
- if 'profile_tag' not in condition:
- return True
- return condition['profile_tag'] == profile_tag
elif condition['kind'] == 'contains_display_name':
return self._contains_display_name(display_name)
elif condition['kind'] == 'room_member_count':
@@ -304,7 +299,7 @@ def _flatten_dict(d, prefix=[], result={}):
if isinstance(value, basestring):
result[".".join(prefix + [key])] = value.lower()
elif hasattr(value, "items"):
- _flatten_dict(value, prefix=(prefix+[key]), result=result)
+ _flatten_dict(value, prefix=(prefix + [key]), result=result)
return result
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index d1b7c0802f..0b463c6fdb 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -16,8 +16,9 @@
from twisted.internet import defer
-from httppusher import HttpPusher
+from .httppusher import HttpPusher
from synapse.push import PusherConfigException
+from synapse.util.logcontext import preserve_fn
import logging
@@ -28,6 +29,7 @@ class PusherPool:
def __init__(self, _hs):
self.hs = _hs
self.store = self.hs.get_datastore()
+ self.clock = self.hs.get_clock()
self.pushers = {}
self.last_pusher_started = -1
@@ -37,8 +39,11 @@ class PusherPool:
self._start_pushers(pushers)
@defer.inlineCallbacks
- def add_pusher(self, user_id, access_token, profile_tag, kind, app_id,
- app_display_name, device_display_name, pushkey, lang, data):
+ def add_pusher(self, user_id, access_token, kind, app_id,
+ app_display_name, device_display_name, pushkey, lang, data,
+ profile_tag=""):
+ time_now_msec = self.clock.time_msec()
+
# we try to create the pusher just to validate the config: it
# will then get pulled out of the database,
# recreated, added and started: this means we have only one
@@ -46,23 +51,31 @@ class PusherPool:
self._create_pusher({
"user_name": user_id,
"kind": kind,
- "profile_tag": profile_tag,
"app_id": app_id,
"app_display_name": app_display_name,
"device_display_name": device_display_name,
"pushkey": pushkey,
- "ts": self.hs.get_clock().time_msec(),
+ "ts": time_now_msec,
"lang": lang,
"data": data,
"last_token": None,
"last_success": None,
"failing_since": None
})
- yield self._add_pusher_to_store(
- user_id, access_token, profile_tag, kind, app_id,
- app_display_name, device_display_name,
- pushkey, lang, data
+ yield self.store.add_pusher(
+ user_id=user_id,
+ access_token=access_token,
+ kind=kind,
+ app_id=app_id,
+ app_display_name=app_display_name,
+ device_display_name=device_display_name,
+ pushkey=pushkey,
+ pushkey_ts=time_now_msec,
+ lang=lang,
+ data=data,
+ profile_tag=profile_tag,
)
+ yield self._refresh_pusher(app_id, pushkey, user_id)
@defer.inlineCallbacks
def remove_pushers_by_app_id_and_pushkey_not_user(self, app_id, pushkey,
@@ -76,47 +89,27 @@ class PusherPool:
"Removing pusher for app id %s, pushkey %s, user %s",
app_id, pushkey, p['user_name']
)
- self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
+ yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
@defer.inlineCallbacks
- def remove_pushers_by_user(self, user_id):
+ def remove_pushers_by_user(self, user_id, except_token_ids=[]):
all = yield self.store.get_all_pushers()
logger.info(
- "Removing all pushers for user %s",
- user_id,
+ "Removing all pushers for user %s except access tokens ids %r",
+ user_id, except_token_ids
)
for p in all:
- if p['user_name'] == user_id:
+ if p['user_name'] == user_id and p['access_token'] not in except_token_ids:
logger.info(
"Removing pusher for app id %s, pushkey %s, user %s",
p['app_id'], p['pushkey'], p['user_name']
)
- self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
-
- @defer.inlineCallbacks
- def _add_pusher_to_store(self, user_id, access_token, profile_tag, kind,
- app_id, app_display_name, device_display_name,
- pushkey, lang, data):
- yield self.store.add_pusher(
- user_id=user_id,
- access_token=access_token,
- profile_tag=profile_tag,
- kind=kind,
- app_id=app_id,
- app_display_name=app_display_name,
- device_display_name=device_display_name,
- pushkey=pushkey,
- pushkey_ts=self.hs.get_clock().time_msec(),
- lang=lang,
- data=data,
- )
- self._refresh_pusher(app_id, pushkey, user_id)
+ yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
def _create_pusher(self, pusherdict):
if pusherdict['kind'] == 'http':
return HttpPusher(
self.hs,
- profile_tag=pusherdict['profile_tag'],
user_id=pusherdict['user_name'],
app_id=pusherdict['app_id'],
app_display_name=pusherdict['app_display_name'],
@@ -166,7 +159,7 @@ class PusherPool:
if fullid in self.pushers:
self.pushers[fullid].stop()
self.pushers[fullid] = p
- p.start()
+ preserve_fn(p.start)()
logger.info("Started pushers")
|