diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index 0727f772a5..5575c847f9 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -253,7 +253,8 @@ class Pusher(object):
self.user_name, config, timeout=0)
self.last_token = chunk['end']
self.store.update_pusher_last_token(
- self.app_id, self.pushkey, self.last_token)
+ self.app_id, self.pushkey, self.user_name, self.last_token
+ )
logger.info("Pusher %s for user %s starting from token %s",
self.pushkey, self.user_name, self.last_token)
@@ -314,7 +315,7 @@ class Pusher(object):
pk
)
yield self.hs.get_pusherpool().remove_pusher(
- self.app_id, pk
+ self.app_id, pk, self.user_name
)
if not self.alive:
@@ -326,6 +327,7 @@ class Pusher(object):
self.store.update_pusher_last_token_and_success(
self.app_id,
self.pushkey,
+ self.user_name,
self.last_token,
self.clock.time_msec()
)
@@ -334,6 +336,7 @@ class Pusher(object):
self.store.update_pusher_failing_since(
self.app_id,
self.pushkey,
+ self.user_name,
self.failing_since)
else:
if not self.failing_since:
@@ -341,6 +344,7 @@ class Pusher(object):
self.store.update_pusher_failing_since(
self.app_id,
self.pushkey,
+ self.user_name,
self.failing_since
)
@@ -358,6 +362,7 @@ class Pusher(object):
self.store.update_pusher_last_token(
self.app_id,
self.pushkey,
+ self.user_name,
self.last_token
)
@@ -365,6 +370,7 @@ class Pusher(object):
self.store.update_pusher_failing_since(
self.app_id,
self.pushkey,
+ self.user_name,
self.failing_since
)
else:
diff --git a/synapse/push/baserules.py b/synapse/push/baserules.py
index 60fd35fbfb..f3d1cf5c5f 100644
--- a/synapse/push/baserules.py
+++ b/synapse/push/baserules.py
@@ -1,3 +1,17 @@
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
from synapse.push.rulekinds import PRIORITY_CLASS_MAP, PRIORITY_CLASS_INVERSE_MAP
@@ -112,7 +126,25 @@ def make_base_prepend_override_rules():
def make_base_append_override_rules():
return [
{
- 'rule_id': 'global/override/.m.rule.call',
+ 'rule_id': 'global/override/.m.rule.suppress_notices',
+ 'conditions': [
+ {
+ 'kind': 'event_match',
+ 'key': 'content.msgtype',
+ 'pattern': 'm.notice',
+ }
+ ],
+ 'actions': [
+ 'dont_notify',
+ ]
+ }
+ ]
+
+
+def make_base_append_underride_rules(user):
+ return [
+ {
+ 'rule_id': 'global/underride/.m.rule.call',
'conditions': [
{
'kind': 'event_match',
@@ -132,19 +164,6 @@ def make_base_append_override_rules():
]
},
{
- 'rule_id': 'global/override/.m.rule.suppress_notices',
- 'conditions': [
- {
- 'kind': 'event_match',
- 'key': 'content.msgtype',
- 'pattern': 'm.notice',
- }
- ],
- 'actions': [
- 'dont_notify',
- ]
- },
- {
'rule_id': 'global/override/.m.rule.contains_display_name',
'conditions': [
{
@@ -162,7 +181,7 @@ def make_base_append_override_rules():
]
},
{
- 'rule_id': 'global/override/.m.rule.room_one_to_one',
+ 'rule_id': 'global/underride/.m.rule.room_one_to_one',
'conditions': [
{
'kind': 'room_member_count',
@@ -179,12 +198,7 @@ def make_base_append_override_rules():
'value': False
}
]
- }
- ]
-
-
-def make_base_append_underride_rules(user):
- return [
+ },
{
'rule_id': 'global/underride/.m.rule.invite_for_me',
'conditions': [
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 90babd7224..0ab2f65972 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -19,10 +19,7 @@ from twisted.internet import defer
from httppusher import HttpPusher
from synapse.push import PusherConfigException
-from syutil.jsonutil import encode_canonical_json
-
import logging
-import simplejson as json
logger = logging.getLogger(__name__)
@@ -52,12 +49,10 @@ class PusherPool:
@defer.inlineCallbacks
def start(self):
pushers = yield self.store.get_all_pushers()
- for p in pushers:
- p['data'] = json.loads(p['data'])
self._start_pushers(pushers)
@defer.inlineCallbacks
- def add_pusher(self, user_name, profile_tag, kind, app_id,
+ def add_pusher(self, user_name, access_token, profile_tag, kind, app_id,
app_display_name, device_display_name, pushkey, lang, data):
# we try to create the pusher just to validate the config: it
# will then get pulled out of the database,
@@ -71,7 +66,7 @@ class PusherPool:
"app_display_name": app_display_name,
"device_display_name": device_display_name,
"pushkey": pushkey,
- "pushkey_ts": self.hs.get_clock().time_msec(),
+ "ts": self.hs.get_clock().time_msec(),
"lang": lang,
"data": data,
"last_token": None,
@@ -79,17 +74,50 @@ class PusherPool:
"failing_since": None
})
yield self._add_pusher_to_store(
- user_name, profile_tag, kind, app_id,
+ user_name, access_token, profile_tag, kind, app_id,
app_display_name, device_display_name,
pushkey, lang, data
)
@defer.inlineCallbacks
- def _add_pusher_to_store(self, user_name, profile_tag, kind, app_id,
- app_display_name, device_display_name,
+ def remove_pushers_by_app_id_and_pushkey_not_user(self, app_id, pushkey,
+ not_user_id):
+ to_remove = yield self.store.get_pushers_by_app_id_and_pushkey(
+ app_id, pushkey
+ )
+ for p in to_remove:
+ if p['user_name'] != not_user_id:
+ logger.info(
+ "Removing pusher for app id %s, pushkey %s, user %s",
+ app_id, pushkey, p['user_name']
+ )
+ self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
+
+ @defer.inlineCallbacks
+ def remove_pushers_by_user_access_token(self, user_id, not_access_token_id):
+ all = yield self.store.get_all_pushers()
+ logger.info(
+ "Removing all pushers for user %s except access token %s",
+ user_id, not_access_token_id
+ )
+ for p in all:
+ if (
+ p['user_name'] == user_id and
+ p['access_token'] != not_access_token_id
+ ):
+ logger.info(
+ "Removing pusher for app id %s, pushkey %s, user %s",
+ p['app_id'], p['pushkey'], p['user_name']
+ )
+ self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
+
+ @defer.inlineCallbacks
+ def _add_pusher_to_store(self, user_name, access_token, profile_tag, kind,
+ app_id, app_display_name, device_display_name,
pushkey, lang, data):
yield self.store.add_pusher(
user_name=user_name,
+ access_token=access_token,
profile_tag=profile_tag,
kind=kind,
app_id=app_id,
@@ -98,9 +126,9 @@ class PusherPool:
pushkey=pushkey,
pushkey_ts=self.hs.get_clock().time_msec(),
lang=lang,
- data=encode_canonical_json(data).decode("UTF-8"),
+ data=data,
)
- self._refresh_pusher((app_id, pushkey))
+ self._refresh_pusher(app_id, pushkey, user_name)
def _create_pusher(self, pusherdict):
if pusherdict['kind'] == 'http':
@@ -112,7 +140,7 @@ class PusherPool:
app_display_name=pusherdict['app_display_name'],
device_display_name=pusherdict['device_display_name'],
pushkey=pusherdict['pushkey'],
- pushkey_ts=pusherdict['pushkey_ts'],
+ pushkey_ts=pusherdict['ts'],
data=pusherdict['data'],
last_token=pusherdict['last_token'],
last_success=pusherdict['last_success'],
@@ -125,30 +153,48 @@ class PusherPool:
)
@defer.inlineCallbacks
- def _refresh_pusher(self, app_id_pushkey):
- p = yield self.store.get_pushers_by_app_id_and_pushkey(
- app_id_pushkey
+ def _refresh_pusher(self, app_id, pushkey, user_name):
+ resultlist = yield self.store.get_pushers_by_app_id_and_pushkey(
+ app_id, pushkey
)
- p['data'] = json.loads(p['data'])
- self._start_pushers([p])
+ p = None
+ for r in resultlist:
+ if r['user_name'] == user_name:
+ p = r
+
+ if p:
+
+ self._start_pushers([p])
def _start_pushers(self, pushers):
logger.info("Starting %d pushers", len(pushers))
for pusherdict in pushers:
- p = self._create_pusher(pusherdict)
+ try:
+ p = self._create_pusher(pusherdict)
+ except PusherConfigException:
+ logger.exception("Couldn't start a pusher: caught PusherConfigException")
+ continue
if p:
- fullid = "%s:%s" % (pusherdict['app_id'], pusherdict['pushkey'])
+ fullid = "%s:%s:%s" % (
+ pusherdict['app_id'],
+ pusherdict['pushkey'],
+ pusherdict['user_name']
+ )
if fullid in self.pushers:
self.pushers[fullid].stop()
self.pushers[fullid] = p
p.start()
+ logger.info("Started pushers")
+
@defer.inlineCallbacks
- def remove_pusher(self, app_id, pushkey):
- fullid = "%s:%s" % (app_id, pushkey)
+ def remove_pusher(self, app_id, pushkey, user_name):
+ fullid = "%s:%s:%s" % (app_id, pushkey, user_name)
if fullid in self.pushers:
logger.info("Stopping pusher %s", fullid)
self.pushers[fullid].stop()
del self.pushers[fullid]
- yield self.store.delete_pusher_by_app_id_pushkey(app_id, pushkey)
+ yield self.store.delete_pusher_by_app_id_pushkey_user_name(
+ app_id, pushkey, user_name
+ )
diff --git a/synapse/push/rulekinds.py b/synapse/push/rulekinds.py
index 660aa4e10e..4c591aa638 100644
--- a/synapse/push/rulekinds.py
+++ b/synapse/push/rulekinds.py
@@ -1,3 +1,17 @@
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
PRIORITY_CLASS_MAP = {
'underride': 1,
'sender': 2,
|