summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorDavid Baker <dave@matrix.org>2015-03-25 19:06:22 +0000
committerDavid Baker <dave@matrix.org>2015-03-25 19:06:22 +0000
commitc1a256cc4c82ce746eae8e719b9aa2344fe66177 (patch)
treef2595a72adbef12770114e71dc58e2d6ab03c0ef /synapse
parentFix tests (diff)
downloadsynapse-c1a256cc4c82ce746eae8e719b9aa2344fe66177.tar.xz
Allow multiple pushers for a single app ID & pushkey, honouring the 'append' flag in the API.
Diffstat (limited to 'synapse')
-rw-r--r--synapse/push/__init__.py10
-rw-r--r--synapse/push/pusherpool.py47
-rw-r--r--synapse/rest/client/v1/pusher.py13
-rw-r--r--synapse/storage/pusher.py27
-rw-r--r--synapse/storage/schema/delta/15/v15.sql27
5 files changed, 96 insertions, 28 deletions
diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index 0727f772a5..5575c847f9 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -253,7 +253,8 @@ class Pusher(object):
                 self.user_name, config, timeout=0)
             self.last_token = chunk['end']
             self.store.update_pusher_last_token(
-                self.app_id, self.pushkey, self.last_token)
+                self.app_id, self.pushkey, self.user_name, self.last_token
+            )
             logger.info("Pusher %s for user %s starting from token %s",
                         self.pushkey, self.user_name, self.last_token)
 
@@ -314,7 +315,7 @@ class Pusher(object):
                                 pk
                             )
                             yield self.hs.get_pusherpool().remove_pusher(
-                                self.app_id, pk
+                                self.app_id, pk, self.user_name
                             )
 
             if not self.alive:
@@ -326,6 +327,7 @@ class Pusher(object):
                 self.store.update_pusher_last_token_and_success(
                     self.app_id,
                     self.pushkey,
+                    self.user_name,
                     self.last_token,
                     self.clock.time_msec()
                 )
@@ -334,6 +336,7 @@ class Pusher(object):
                     self.store.update_pusher_failing_since(
                         self.app_id,
                         self.pushkey,
+                        self.user_name,
                         self.failing_since)
             else:
                 if not self.failing_since:
@@ -341,6 +344,7 @@ class Pusher(object):
                     self.store.update_pusher_failing_since(
                         self.app_id,
                         self.pushkey,
+                        self.user_name,
                         self.failing_since
                     )
 
@@ -358,6 +362,7 @@ class Pusher(object):
                     self.store.update_pusher_last_token(
                         self.app_id,
                         self.pushkey,
+                        self.user_name,
                         self.last_token
                     )
 
@@ -365,6 +370,7 @@ class Pusher(object):
                     self.store.update_pusher_failing_since(
                         self.app_id,
                         self.pushkey,
+                        self.user_name,
                         self.failing_since
                     )
                 else:
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index f75eebf8bf..cda072839c 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -85,6 +85,21 @@ class PusherPool:
         )
 
     @defer.inlineCallbacks
+    def remove_pushers_by_app_id_and_pushkey_not_user(self, app_id, pushkey,
+                                                      not_user_id):
+        to_remove = yield self.store.get_pushers_by_app_id_and_pushkey(
+            app_id, pushkey
+        )
+        for p in to_remove:
+            if p['user_name'] != not_user_id:
+                logger.info(
+                    "Removing pusher for app id %s, pushkey %s, user %s",
+                    app_id, pushkey, p['user_name']
+                )
+                self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
+
+
+    @defer.inlineCallbacks
     def _add_pusher_to_store(self, user_name, access_token, profile_tag, kind,
                              app_id, app_display_name, device_display_name,
                              pushkey, lang, data):
@@ -101,7 +116,7 @@ class PusherPool:
             lang=lang,
             data=encode_canonical_json(data).decode("UTF-8"),
         )
-        self._refresh_pusher((app_id, pushkey))
+        self._refresh_pusher(app_id, pushkey, user_name)
 
     def _create_pusher(self, pusherdict):
         if pusherdict['kind'] == 'http':
@@ -126,30 +141,42 @@ class PusherPool:
             )
 
     @defer.inlineCallbacks
-    def _refresh_pusher(self, app_id_pushkey):
-        p = yield self.store.get_pushers_by_app_id_and_pushkey(
-            app_id_pushkey
+    def _refresh_pusher(self, app_id, pushkey, user_name):
+        resultlist = yield self.store.get_pushers_by_app_id_and_pushkey(
+            app_id, pushkey
         )
-        p['data'] = json.loads(p['data'])
+        p = None
+        for r in resultlist:
+            if r['user_name'] == user_name:
+                p = r
 
-        self._start_pushers([p])
+        if p:
+            p['data'] = json.loads(p['data'])
+
+            self._start_pushers([p])
 
     def _start_pushers(self, pushers):
         logger.info("Starting %d pushers", len(pushers))
         for pusherdict in pushers:
             p = self._create_pusher(pusherdict)
             if p:
-                fullid = "%s:%s" % (pusherdict['app_id'], pusherdict['pushkey'])
+                fullid = "%s:%s:%s" % (
+                    pusherdict['app_id'],
+                    pusherdict['pushkey'],
+                    pusherdict['user_name']
+                )
                 if fullid in self.pushers:
                     self.pushers[fullid].stop()
                 self.pushers[fullid] = p
                 p.start()
 
     @defer.inlineCallbacks
-    def remove_pusher(self, app_id, pushkey):
-        fullid = "%s:%s" % (app_id, pushkey)
+    def remove_pusher(self, app_id, pushkey, user_name):
+        fullid = "%s:%s:%s" % (app_id, pushkey, user_name)
         if fullid in self.pushers:
             logger.info("Stopping pusher %s", fullid)
             self.pushers[fullid].stop()
             del self.pushers[fullid]
-        yield self.store.delete_pusher_by_app_id_pushkey(app_id, pushkey)
+        yield self.store.delete_pusher_by_app_id_pushkey_user_name(
+            app_id, pushkey, user_name
+        )
diff --git a/synapse/rest/client/v1/pusher.py b/synapse/rest/client/v1/pusher.py
index 87e89c9305..c83287c028 100644
--- a/synapse/rest/client/v1/pusher.py
+++ b/synapse/rest/client/v1/pusher.py
@@ -37,7 +37,7 @@ class PusherRestServlet(ClientV1RestServlet):
                 and 'kind' in content and
                 content['kind'] is None):
             yield pusher_pool.remove_pusher(
-                content['app_id'], content['pushkey']
+                content['app_id'], content['pushkey'], user_name=user.to_string()
             )
             defer.returnValue((200, {}))
 
@@ -51,6 +51,17 @@ class PusherRestServlet(ClientV1RestServlet):
             raise SynapseError(400, "Missing parameters: "+','.join(missing),
                                errcode=Codes.MISSING_PARAM)
 
+        append = False
+        if 'append' in content:
+            append = content['append']
+
+        if not append:
+            yield pusher_pool.remove_pushers_by_app_id_and_pushkey_not_user(
+                app_id=content['app_id'],
+                pushkey=content['pushkey'],
+                not_user_id=user.to_string()
+            )
+
         try:
             yield pusher_pool.add_pusher(
                 user_name=user.to_string(),
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 1ef8e06ac6..423878c6a0 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -27,7 +27,7 @@ logger = logging.getLogger(__name__)
 
 class PusherStore(SQLBaseStore):
     @defer.inlineCallbacks
-    def get_pushers_by_app_id_and_pushkey(self, app_id_and_pushkey):
+    def get_pushers_by_app_id_and_pushkey(self, app_id, pushkey):
         sql = (
             "SELECT id, user_name, kind, profile_tag, app_id,"
             "app_display_name, device_display_name, pushkey, ts, data, "
@@ -38,7 +38,7 @@ class PusherStore(SQLBaseStore):
 
         rows = yield self._execute(
             "get_pushers_by_app_id_and_pushkey", None, sql,
-            app_id_and_pushkey[0], app_id_and_pushkey[1]
+            app_id, pushkey
         )
 
         ret = [
@@ -60,7 +60,7 @@ class PusherStore(SQLBaseStore):
             for r in rows
         ]
 
-        defer.returnValue(ret[0])
+        defer.returnValue(ret)
 
     @defer.inlineCallbacks
     def get_all_pushers(self):
@@ -104,9 +104,9 @@ class PusherStore(SQLBaseStore):
                 dict(
                     app_id=app_id,
                     pushkey=pushkey,
+                    user_name=user_name,
                 ),
                 dict(
-                    user_name=user_name,
                     access_token=access_token,
                     kind=kind,
                     profile_tag=profile_tag,
@@ -123,37 +123,38 @@ class PusherStore(SQLBaseStore):
             raise StoreError(500, "Problem creating pusher.")
 
     @defer.inlineCallbacks
-    def delete_pusher_by_app_id_pushkey(self, app_id, pushkey):
+    def delete_pusher_by_app_id_pushkey_user_name(self, app_id, pushkey, user_name):
         yield self._simple_delete_one(
             PushersTable.table_name,
-            {"app_id": app_id, "pushkey": pushkey},
-            desc="delete_pusher_by_app_id_pushkey",
+            {"app_id": app_id, "pushkey": pushkey, 'user_name': user_name},
+            desc="delete_pusher_by_app_id_pushkey_user_name",
         )
 
     @defer.inlineCallbacks
-    def update_pusher_last_token(self, app_id, pushkey, last_token):
+    def update_pusher_last_token(self, app_id, pushkey, user_name, last_token):
         yield self._simple_update_one(
             PushersTable.table_name,
-            {'app_id': app_id, 'pushkey': pushkey},
+            {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_name},
             {'last_token': last_token},
             desc="update_pusher_last_token",
         )
 
     @defer.inlineCallbacks
-    def update_pusher_last_token_and_success(self, app_id, pushkey,
+    def update_pusher_last_token_and_success(self, app_id, pushkey, user_name,
                                              last_token, last_success):
         yield self._simple_update_one(
             PushersTable.table_name,
-            {'app_id': app_id, 'pushkey': pushkey},
+            {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_name},
             {'last_token': last_token, 'last_success': last_success},
             desc="update_pusher_last_token_and_success",
         )
 
     @defer.inlineCallbacks
-    def update_pusher_failing_since(self, app_id, pushkey, failing_since):
+    def update_pusher_failing_since(self, app_id, pushkey, user_name,
+                                    failing_since):
         yield self._simple_update_one(
             PushersTable.table_name,
-            {'app_id': app_id, 'pushkey': pushkey},
+            {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_name},
             {'failing_since': failing_since},
             desc="update_pusher_failing_since",
         )
diff --git a/synapse/storage/schema/delta/15/v15.sql b/synapse/storage/schema/delta/15/v15.sql
index fc3e436877..f5b2a08ca4 100644
--- a/synapse/storage/schema/delta/15/v15.sql
+++ b/synapse/storage/schema/delta/15/v15.sql
@@ -1,2 +1,25 @@
-ALTER TABLE pushers ADD COLUMN access_token INTEGER DEFAULT NULL;
-
+-- Drop, copy & recreate pushers table to change unique key
+-- Also add access_token column at the same time
+CREATE TABLE IF NOT EXISTS pushers2 (
+  id INTEGER PRIMARY KEY AUTOINCREMENT,
+  user_name TEXT NOT NULL,
+  access_token INTEGER DEFAULT NULL,
+  profile_tag varchar(32) NOT NULL,
+  kind varchar(8) NOT NULL,
+  app_id varchar(64) NOT NULL,
+  app_display_name varchar(64) NOT NULL,
+  device_display_name varchar(128) NOT NULL,
+  pushkey blob NOT NULL,
+  ts BIGINT NOT NULL,
+  lang varchar(8),
+  data blob,
+  last_token TEXT,
+  last_success BIGINT,
+  failing_since BIGINT,
+  FOREIGN KEY(user_name) REFERENCES users(name),
+  UNIQUE (app_id, pushkey, user_name)
+);
+INSERT INTO pushers2 (id, user_name, profile_tag, kind, app_id, app_display_name, device_display_name, pushkey, ts, lang, data, last_token, last_success, failing_since)
+  SELECT id, user_name, profile_tag, kind, app_id, app_display_name, device_display_name, pushkey, ts, lang, data, last_token, last_success, failing_since FROM pushers;
+DROP TABLE pushers;
+ALTER TABLE pushers2 RENAME TO pushers;