diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index 0727f772a5..5575c847f9 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -253,7 +253,8 @@ class Pusher(object):
self.user_name, config, timeout=0)
self.last_token = chunk['end']
self.store.update_pusher_last_token(
- self.app_id, self.pushkey, self.last_token)
+ self.app_id, self.pushkey, self.user_name, self.last_token
+ )
logger.info("Pusher %s for user %s starting from token %s",
self.pushkey, self.user_name, self.last_token)
@@ -314,7 +315,7 @@ class Pusher(object):
pk
)
yield self.hs.get_pusherpool().remove_pusher(
- self.app_id, pk
+ self.app_id, pk, self.user_name
)
if not self.alive:
@@ -326,6 +327,7 @@ class Pusher(object):
self.store.update_pusher_last_token_and_success(
self.app_id,
self.pushkey,
+ self.user_name,
self.last_token,
self.clock.time_msec()
)
@@ -334,6 +336,7 @@ class Pusher(object):
self.store.update_pusher_failing_since(
self.app_id,
self.pushkey,
+ self.user_name,
self.failing_since)
else:
if not self.failing_since:
@@ -341,6 +344,7 @@ class Pusher(object):
self.store.update_pusher_failing_since(
self.app_id,
self.pushkey,
+ self.user_name,
self.failing_since
)
@@ -358,6 +362,7 @@ class Pusher(object):
self.store.update_pusher_last_token(
self.app_id,
self.pushkey,
+ self.user_name,
self.last_token
)
@@ -365,6 +370,7 @@ class Pusher(object):
self.store.update_pusher_failing_since(
self.app_id,
self.pushkey,
+ self.user_name,
self.failing_since
)
else:
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index f75eebf8bf..cda072839c 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -85,6 +85,21 @@ class PusherPool:
)
@defer.inlineCallbacks
+ def remove_pushers_by_app_id_and_pushkey_not_user(self, app_id, pushkey,
+ not_user_id):
+ to_remove = yield self.store.get_pushers_by_app_id_and_pushkey(
+ app_id, pushkey
+ )
+ for p in to_remove:
+ if p['user_name'] != not_user_id:
+ logger.info(
+ "Removing pusher for app id %s, pushkey %s, user %s",
+ app_id, pushkey, p['user_name']
+ )
+ self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
+
+
+ @defer.inlineCallbacks
def _add_pusher_to_store(self, user_name, access_token, profile_tag, kind,
app_id, app_display_name, device_display_name,
pushkey, lang, data):
@@ -101,7 +116,7 @@ class PusherPool:
lang=lang,
data=encode_canonical_json(data).decode("UTF-8"),
)
- self._refresh_pusher((app_id, pushkey))
+ self._refresh_pusher(app_id, pushkey, user_name)
def _create_pusher(self, pusherdict):
if pusherdict['kind'] == 'http':
@@ -126,30 +141,42 @@ class PusherPool:
)
@defer.inlineCallbacks
- def _refresh_pusher(self, app_id_pushkey):
- p = yield self.store.get_pushers_by_app_id_and_pushkey(
- app_id_pushkey
+ def _refresh_pusher(self, app_id, pushkey, user_name):
+ resultlist = yield self.store.get_pushers_by_app_id_and_pushkey(
+ app_id, pushkey
)
- p['data'] = json.loads(p['data'])
+ p = None
+ for r in resultlist:
+ if r['user_name'] == user_name:
+ p = r
- self._start_pushers([p])
+ if p:
+ p['data'] = json.loads(p['data'])
+
+ self._start_pushers([p])
def _start_pushers(self, pushers):
logger.info("Starting %d pushers", len(pushers))
for pusherdict in pushers:
p = self._create_pusher(pusherdict)
if p:
- fullid = "%s:%s" % (pusherdict['app_id'], pusherdict['pushkey'])
+ fullid = "%s:%s:%s" % (
+ pusherdict['app_id'],
+ pusherdict['pushkey'],
+ pusherdict['user_name']
+ )
if fullid in self.pushers:
self.pushers[fullid].stop()
self.pushers[fullid] = p
p.start()
@defer.inlineCallbacks
- def remove_pusher(self, app_id, pushkey):
- fullid = "%s:%s" % (app_id, pushkey)
+ def remove_pusher(self, app_id, pushkey, user_name):
+ fullid = "%s:%s:%s" % (app_id, pushkey, user_name)
if fullid in self.pushers:
logger.info("Stopping pusher %s", fullid)
self.pushers[fullid].stop()
del self.pushers[fullid]
- yield self.store.delete_pusher_by_app_id_pushkey(app_id, pushkey)
+ yield self.store.delete_pusher_by_app_id_pushkey_user_name(
+ app_id, pushkey, user_name
+ )
diff --git a/synapse/rest/client/v1/pusher.py b/synapse/rest/client/v1/pusher.py
index 87e89c9305..c83287c028 100644
--- a/synapse/rest/client/v1/pusher.py
+++ b/synapse/rest/client/v1/pusher.py
@@ -37,7 +37,7 @@ class PusherRestServlet(ClientV1RestServlet):
and 'kind' in content and
content['kind'] is None):
yield pusher_pool.remove_pusher(
- content['app_id'], content['pushkey']
+ content['app_id'], content['pushkey'], user_name=user.to_string()
)
defer.returnValue((200, {}))
@@ -51,6 +51,17 @@ class PusherRestServlet(ClientV1RestServlet):
raise SynapseError(400, "Missing parameters: "+','.join(missing),
errcode=Codes.MISSING_PARAM)
+ append = False
+ if 'append' in content:
+ append = content['append']
+
+ if not append:
+ yield pusher_pool.remove_pushers_by_app_id_and_pushkey_not_user(
+ app_id=content['app_id'],
+ pushkey=content['pushkey'],
+ not_user_id=user.to_string()
+ )
+
try:
yield pusher_pool.add_pusher(
user_name=user.to_string(),
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 1ef8e06ac6..423878c6a0 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -27,7 +27,7 @@ logger = logging.getLogger(__name__)
class PusherStore(SQLBaseStore):
@defer.inlineCallbacks
- def get_pushers_by_app_id_and_pushkey(self, app_id_and_pushkey):
+ def get_pushers_by_app_id_and_pushkey(self, app_id, pushkey):
sql = (
"SELECT id, user_name, kind, profile_tag, app_id,"
"app_display_name, device_display_name, pushkey, ts, data, "
@@ -38,7 +38,7 @@ class PusherStore(SQLBaseStore):
rows = yield self._execute(
"get_pushers_by_app_id_and_pushkey", None, sql,
- app_id_and_pushkey[0], app_id_and_pushkey[1]
+ app_id, pushkey
)
ret = [
@@ -60,7 +60,7 @@ class PusherStore(SQLBaseStore):
for r in rows
]
- defer.returnValue(ret[0])
+ defer.returnValue(ret)
@defer.inlineCallbacks
def get_all_pushers(self):
@@ -104,9 +104,9 @@ class PusherStore(SQLBaseStore):
dict(
app_id=app_id,
pushkey=pushkey,
+ user_name=user_name,
),
dict(
- user_name=user_name,
access_token=access_token,
kind=kind,
profile_tag=profile_tag,
@@ -123,37 +123,38 @@ class PusherStore(SQLBaseStore):
raise StoreError(500, "Problem creating pusher.")
@defer.inlineCallbacks
- def delete_pusher_by_app_id_pushkey(self, app_id, pushkey):
+ def delete_pusher_by_app_id_pushkey_user_name(self, app_id, pushkey, user_name):
yield self._simple_delete_one(
PushersTable.table_name,
- {"app_id": app_id, "pushkey": pushkey},
- desc="delete_pusher_by_app_id_pushkey",
+ {"app_id": app_id, "pushkey": pushkey, 'user_name': user_name},
+ desc="delete_pusher_by_app_id_pushkey_user_name",
)
@defer.inlineCallbacks
- def update_pusher_last_token(self, app_id, pushkey, last_token):
+ def update_pusher_last_token(self, app_id, pushkey, user_name, last_token):
yield self._simple_update_one(
PushersTable.table_name,
- {'app_id': app_id, 'pushkey': pushkey},
+ {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_name},
{'last_token': last_token},
desc="update_pusher_last_token",
)
@defer.inlineCallbacks
- def update_pusher_last_token_and_success(self, app_id, pushkey,
+ def update_pusher_last_token_and_success(self, app_id, pushkey, user_name,
last_token, last_success):
yield self._simple_update_one(
PushersTable.table_name,
- {'app_id': app_id, 'pushkey': pushkey},
+ {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_name},
{'last_token': last_token, 'last_success': last_success},
desc="update_pusher_last_token_and_success",
)
@defer.inlineCallbacks
- def update_pusher_failing_since(self, app_id, pushkey, failing_since):
+ def update_pusher_failing_since(self, app_id, pushkey, user_name,
+ failing_since):
yield self._simple_update_one(
PushersTable.table_name,
- {'app_id': app_id, 'pushkey': pushkey},
+ {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_name},
{'failing_since': failing_since},
desc="update_pusher_failing_since",
)
diff --git a/synapse/storage/schema/delta/15/v15.sql b/synapse/storage/schema/delta/15/v15.sql
index fc3e436877..f5b2a08ca4 100644
--- a/synapse/storage/schema/delta/15/v15.sql
+++ b/synapse/storage/schema/delta/15/v15.sql
@@ -1,2 +1,25 @@
-ALTER TABLE pushers ADD COLUMN access_token INTEGER DEFAULT NULL;
-
+-- Drop, copy & recreate pushers table to change unique key
+-- Also add access_token column at the same time
+CREATE TABLE IF NOT EXISTS pushers2 (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ user_name TEXT NOT NULL,
+ access_token INTEGER DEFAULT NULL,
+ profile_tag varchar(32) NOT NULL,
+ kind varchar(8) NOT NULL,
+ app_id varchar(64) NOT NULL,
+ app_display_name varchar(64) NOT NULL,
+ device_display_name varchar(128) NOT NULL,
+ pushkey blob NOT NULL,
+ ts BIGINT NOT NULL,
+ lang varchar(8),
+ data blob,
+ last_token TEXT,
+ last_success BIGINT,
+ failing_since BIGINT,
+ FOREIGN KEY(user_name) REFERENCES users(name),
+ UNIQUE (app_id, pushkey, user_name)
+);
+INSERT INTO pushers2 (id, user_name, profile_tag, kind, app_id, app_display_name, device_display_name, pushkey, ts, lang, data, last_token, last_success, failing_since)
+ SELECT id, user_name, profile_tag, kind, app_id, app_display_name, device_display_name, pushkey, ts, lang, data, last_token, last_success, failing_since FROM pushers;
+DROP TABLE pushers;
+ALTER TABLE pushers2 RENAME TO pushers;
|