From 74c38797601f6d7d1a02d21fc54ceb1a54629c64 Mon Sep 17 00:00:00 2001
From: David Baker <dbkr@matrix.org>
Date: Wed, 19 Nov 2014 18:20:59 +0000
Subject: Start creating a module to do generic notifications (just prints them
 to stdout currently!)

---
 synapse/storage/__init__.py         |  6 ++-
 synapse/storage/pusher.py           | 98 +++++++++++++++++++++++++++++++++++++
 synapse/storage/schema/delta/v7.sql | 28 +++++++++++
 synapse/storage/schema/pusher.sql   | 28 +++++++++++
 4 files changed, 158 insertions(+), 2 deletions(-)
 create mode 100644 synapse/storage/pusher.py
 create mode 100644 synapse/storage/schema/delta/v7.sql
 create mode 100644 synapse/storage/schema/pusher.sql

(limited to 'synapse/storage')

diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index c36d938d96..5957f938a4 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -33,6 +33,7 @@ from .stream import StreamStore
 from .transactions import TransactionStore
 from .keys import KeyStore
 from .event_federation import EventFederationStore
+from .pusher import PusherStore
 
 from .state import StateStore
 from .signatures import SignatureStore
@@ -62,12 +63,13 @@ SCHEMAS = [
     "state",
     "event_edges",
     "event_signatures",
+    "pusher"
 ]
 
 
 # Remember to update this number every time an incompatible change is made to
 # database schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 6
+SCHEMA_VERSION = 7
 
 
 class _RollbackButIsFineException(Exception):
@@ -81,7 +83,7 @@ class DataStore(RoomMemberStore, RoomStore,
                 RegistrationStore, StreamStore, ProfileStore, FeedbackStore,
                 PresenceStore, TransactionStore,
                 DirectoryStore, KeyStore, StateStore, SignatureStore,
-                EventFederationStore, ):
+                EventFederationStore, PusherStore, ):
 
     def __init__(self, hs):
         super(DataStore, self).__init__(hs)
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
new file mode 100644
index 0000000000..047a5f42d9
--- /dev/null
+++ b/synapse/storage/pusher.py
@@ -0,0 +1,98 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import collections
+
+from ._base import SQLBaseStore, Table
+from twisted.internet import defer
+
+from sqlite3 import IntegrityError
+from synapse.api.errors import StoreError
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+class PusherStore(SQLBaseStore):
+    @defer.inlineCallbacks
+    def get_all_pushers_after_id(self, min_id):
+        sql = (
+            "SELECT id, user_name, kind, app, app_display_name, device_display_name, pushkey, data, last_token "
+            "FROM pushers "
+            "WHERE id > ?"
+        )
+
+        rows = yield self._execute(None, sql, min_id)
+
+        ret = [
+            {
+                "id": r[0],
+                "user_name": r[1],
+                "kind": r[2],
+                "app": r[3],
+                "app_display_name": r[4],
+                "device_display_name": r[5],
+                "pushkey": r[6],
+                "data": r[7],
+                "last_token": r[8]
+
+            }
+            for r in rows
+        ]
+
+        defer.returnValue(ret)
+
+    @defer.inlineCallbacks
+    def add_pusher(self, user_name, kind, app, app_display_name, device_display_name, pushkey, data):
+        try:
+            yield self._simple_insert(PushersTable.table_name, dict(
+                user_name=user_name,
+                kind=kind,
+                app=app,
+                app_display_name=app_display_name,
+                device_display_name=device_display_name,
+                pushkey=pushkey,
+                data=data
+            ))
+        except IntegrityError:
+            raise StoreError(409, "Pushkey in use.")
+        except Exception as e:
+            logger.error("create_pusher with failed: %s", e)
+            raise StoreError(500, "Problem creating pusher.")
+
+    @defer.inlineCallbacks
+    def update_pusher_last_token(self, user_name, pushkey, last_token):
+        yield self._simple_update_one(PushersTable.table_name,
+                                      {'user_name': user_name, 'pushkey': pushkey},
+                                      {'last_token': last_token}
+        )
+
+
+class PushersTable(Table):
+    table_name = "pushers"
+
+    fields = [
+        "id",
+        "user_name",
+        "kind",
+        "app"
+        "app_display_name",
+        "device_display_name",
+        "pushkey",
+        "data",
+        "last_token"
+    ]
+
+    EntryType = collections.namedtuple("PusherEntry", fields)
\ No newline at end of file
diff --git a/synapse/storage/schema/delta/v7.sql b/synapse/storage/schema/delta/v7.sql
new file mode 100644
index 0000000000..7f6852485d
--- /dev/null
+++ b/synapse/storage/schema/delta/v7.sql
@@ -0,0 +1,28 @@
+/* Copyright 2014 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-- Push notification endpoints that users have configured
+CREATE TABLE IF NOT EXISTS pushers (
+  id INTEGER PRIMARY KEY AUTOINCREMENT,
+  user_name TEXT NOT NULL,
+  kind varchar(8) NOT NULL,
+  app varchar(64) NOT NULL,
+  app_display_name varchar(64) NOT NULL,
+  device_display_name varchar(128) NOT NULL,
+  pushkey blob NOT NULL,
+  data text,
+  last_token TEXT,
+  FOREIGN KEY(user_name) REFERENCES users(name),
+  UNIQUE (user_name, pushkey)
+);
diff --git a/synapse/storage/schema/pusher.sql b/synapse/storage/schema/pusher.sql
new file mode 100644
index 0000000000..7f6852485d
--- /dev/null
+++ b/synapse/storage/schema/pusher.sql
@@ -0,0 +1,28 @@
+/* Copyright 2014 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-- Push notification endpoints that users have configured
+CREATE TABLE IF NOT EXISTS pushers (
+  id INTEGER PRIMARY KEY AUTOINCREMENT,
+  user_name TEXT NOT NULL,
+  kind varchar(8) NOT NULL,
+  app varchar(64) NOT NULL,
+  app_display_name varchar(64) NOT NULL,
+  device_display_name varchar(128) NOT NULL,
+  pushkey blob NOT NULL,
+  data text,
+  last_token TEXT,
+  FOREIGN KEY(user_name) REFERENCES users(name),
+  UNIQUE (user_name, pushkey)
+);
-- 
cgit 1.4.1


From eb6aedf92c0fe467fd4724623262907ad78573bb Mon Sep 17 00:00:00 2001
From: David Baker <dbkr@matrix.org>
Date: Fri, 21 Nov 2014 12:21:00 +0000
Subject: More work on pushers. Attempt to do HTTP pokes. Not sure if the
 actual HTTP pokes work or not yet but the retry semantics are pretty good.

---
 synapse/http/client.py              | 19 ++++++++++++
 synapse/push/__init__.py            | 58 ++++++++++++++++++++++++++++++-------
 synapse/push/httppusher.py          | 55 ++++++++++++++++++++++++++++++++---
 synapse/push/pusherpool.py          |  8 +++--
 synapse/storage/pusher.py           | 26 ++++++++++++++---
 synapse/storage/schema/delta/v7.sql |  2 ++
 synapse/storage/schema/pusher.sql   |  2 ++
 7 files changed, 150 insertions(+), 20 deletions(-)

(limited to 'synapse/storage')

diff --git a/synapse/http/client.py b/synapse/http/client.py
index 048a428905..82e80385ce 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -60,6 +60,25 @@ class SimpleHttpClient(object):
 
         defer.returnValue(json.loads(body))
 
+    @defer.inlineCallbacks
+    def post_json_get_json(self, uri, post_json):
+        json_str = json.dumps(post_json)
+
+        logger.info("HTTP POST %s -> %s", json_str, uri)
+
+        response = yield self.agent.request(
+            "POST",
+            uri.encode("ascii"),
+            headers=Headers({
+                "Content-Type": ["application/json"]
+            }),
+            bodyProducer=FileBodyProducer(StringIO(json_str))
+        )
+
+        body = yield readBody(response)
+
+        defer.returnValue(json.loads(body))
+
     @defer.inlineCallbacks
     def get_json(self, uri, args={}):
         """ Get's some json from the given host and path
diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index df0b91a8e9..a96f0f0183 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -26,12 +26,15 @@ logger = logging.getLogger(__name__)
 
 class Pusher(object):
     INITIAL_BACKOFF = 1000
-    MAX_BACKOFF = 10 * 60 * 1000
+    MAX_BACKOFF = 60 * 60 * 1000
+    GIVE_UP_AFTER = 24 * 60 * 60 * 1000
 
-    def __init__(self, _hs, user_name, app, app_display_name, device_display_name, pushkey, data, last_token):
+    def __init__(self, _hs, user_name, app, app_display_name, device_display_name, pushkey, data,
+                 last_token, last_success, failing_since):
         self.hs = _hs
         self.evStreamHandler = self.hs.get_handlers().event_stream_handler
         self.store = self.hs.get_datastore()
+        self.clock = self.hs.get_clock()
         self.user_name = user_name
         self.app = app
         self.app_display_name = app_display_name
@@ -40,6 +43,7 @@ class Pusher(object):
         self.data = data
         self.last_token = last_token
         self.backoff_delay = Pusher.INITIAL_BACKOFF
+        self.failing_since = None
 
     @defer.inlineCallbacks
     def start(self):
@@ -58,17 +62,51 @@ class Pusher(object):
             config = PaginationConfig(from_token=from_tok, limit='1')
             chunk = yield self.evStreamHandler.get_stream(self.user_name, config, timeout=100*365*24*60*60*1000)
 
-            if (self.dispatchPush(chunk['chunk'][0])):
+            # limiting to 1 may get 1 event plus 1 presence event, so pick out the actual event
+            singleEvent = None
+            for c in chunk['chunk']:
+                if 'event_id' in c: # Hmmm...
+                    singleEvent = c
+                    break
+            if not singleEvent:
+                continue
+
+            ret = yield self.dispatchPush(singleEvent)
+            if (ret):
                 self.backoff_delay = Pusher.INITIAL_BACKOFF
                 self.last_token = chunk['end']
-                self.store.update_pusher_last_token(self.user_name, self.pushkey, self.last_token)
+                self.store.update_pusher_last_token_and_success(self.user_name, self.pushkey,
+                                                                self.last_token, self.clock.time_msec())
+                if self.failing_since:
+                    self.failing_since = None
+                    self.store.update_pusher_failing_since(self.user_name, self.pushkey, self.failing_since)
             else:
-                logger.warn("Failed to dispatch push for user %s. Trying again in %dms",
-                            self.user_name, self.backoff_delay)
-                yield synapse.util.async.sleep(self.backoff_delay / 1000.0)
-                self.backoff_delay *=2
-                if self.backoff_delay > Pusher.MAX_BACKOFF:
-                    self.backoff_delay = Pusher.MAX_BACKOFF
+                if not self.failing_since:
+                    self.failing_since = self.clock.time_msec()
+                    self.store.update_pusher_failing_since(self.user_name, self.pushkey, self.failing_since)
+
+                if self.failing_since and self.failing_since < self.clock.time_msec() - Pusher.GIVE_UP_AFTER:
+                    # we really only give up so that if the URL gets fixed, we don't suddenly deliver a load
+                    # of old notifications.
+                    logger.warn("Giving up on a notification to user %s, pushkey %s",
+                                self.user_name, self.pushkey)
+                    self.backoff_delay = Pusher.INITIAL_BACKOFF
+                    self.last_token = chunk['end']
+                    self.store.update_pusher_last_token(self.user_name, self.pushkey, self.last_token)
+
+                    self.failing_since = None
+                    self.store.update_pusher_failing_since(self.user_name, self.pushkey, self.failing_since)
+                else:
+                    logger.warn("Failed to dispatch push for user %s (failing for %dms)."
+                                "Trying again in %dms",
+                            self.user_name,
+                            self.clock.time_msec() - self.failing_since,
+                            self.backoff_delay
+                    )
+                    yield synapse.util.async.sleep(self.backoff_delay / 1000.0)
+                    self.backoff_delay *=2
+                    if self.backoff_delay > Pusher.MAX_BACKOFF:
+                        self.backoff_delay = Pusher.MAX_BACKOFF
 
 
 class PusherConfigException(Exception):
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index f3c3ca8191..33d735b974 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -14,13 +14,17 @@
 # limitations under the License.
 
 from synapse.push import Pusher, PusherConfigException
+from synapse.http.client import SimpleHttpClient
+
+from twisted.internet import defer
 
 import logging
 
 logger = logging.getLogger(__name__)
 
 class HttpPusher(Pusher):
-    def __init__(self, _hs, user_name, app, app_display_name, device_display_name, pushkey, data, last_token):
+    def __init__(self, _hs, user_name, app, app_display_name, device_display_name, pushkey, data,
+                 last_token, last_success, failing_since):
         super(HttpPusher, self).__init__(_hs,
                                          user_name,
                                          app,
@@ -28,12 +32,55 @@ class HttpPusher(Pusher):
                                          device_display_name,
                                          pushkey,
                                          data,
-                                         last_token)
+                                         last_token,
+                                         last_success,
+                                         failing_since)
         if 'url' not in data:
             raise PusherConfigException("'url' required in data for HTTP pusher")
         self.url = data['url']
+        self.httpCli = SimpleHttpClient(self.hs)
+        self.data_minus_url = {}
+        self.data_minus_url.update(self.data)
+        del self.data_minus_url['url']
+
+    def _build_notification_dict(self, event):
+        # we probably do not want to push for every presence update
+        # (we may want to be able to set up notifications when specific
+        # people sign in, but we'd want to only deliver the pertinent ones)
+        # Actually, presence events will not get this far now because we
+        # need to filter them out in the main Pusher code.
+        if 'event_id' not in event:
+            return None
+
+        return {
+           'notification': {
+               'transition' : 'new', # everything is new for now: we don't have read receipts
+               'id': event['event_id'],
+               'type': event['type'],
+               'from': event['user_id'],
+               # we may have to fetch this over federation and we can't trust it anyway: is it worth it?
+               #'fromDisplayName': 'Steve Stevington'
+           },
+           #'counts': { -- we don't mark messages as read yet so we have no way of knowing
+           #    'unread': 1,
+           #    'missedCalls': 2
+           # },
+           'devices': {
+               self.pushkey: {
+                   'data' : self.data_minus_url
+                }
+           }
+        }
 
+    @defer.inlineCallbacks
     def dispatchPush(self, event):
-        print event
-        return True
+        notificationDict = self._build_notification_dict(event)
+        if not notificationDict:
+            defer.returnValue(True)
+        try:
+            yield self.httpCli.post_json_get_json(self.url, notificationDict)
+        except:
+            logger.exception("Failed to push %s ", self.url)
+            defer.returnValue(False)
+        defer.returnValue(True)
 
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 436040f123..3fa5a4c4ff 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -45,7 +45,9 @@ class PusherPool:
             "device_display_name": device_display_name,
             "pushkey": pushkey,
             "data": data,
-            "last_token": None
+            "last_token": None,
+            "last_success": None,
+            "failing_since": None
         })
         self._add_pusher_to_store(user_name, kind, app, app_display_name, device_display_name, pushkey, data)
 
@@ -69,7 +71,9 @@ class PusherPool:
                                device_display_name=pusherdict['device_display_name'],
                                pushkey=pusherdict['pushkey'],
                                data=pusherdict['data'],
-                               last_token=pusherdict['last_token']
+                               last_token=pusherdict['last_token'],
+                               last_success=pusherdict['last_success'],
+                               failing_since=pusherdict['failing_since']
                                )
         else:
             raise PusherConfigException("Unknown pusher type '%s' for user %s" %
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 047a5f42d9..ce158c4b18 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -29,7 +29,8 @@ class PusherStore(SQLBaseStore):
     @defer.inlineCallbacks
     def get_all_pushers_after_id(self, min_id):
         sql = (
-            "SELECT id, user_name, kind, app, app_display_name, device_display_name, pushkey, data, last_token "
+            "SELECT id, user_name, kind, app, app_display_name, device_display_name, pushkey, data, "
+            "last_token, last_success, failing_since "
             "FROM pushers "
             "WHERE id > ?"
         )
@@ -46,8 +47,9 @@ class PusherStore(SQLBaseStore):
                 "device_display_name": r[5],
                 "pushkey": r[6],
                 "data": r[7],
-                "last_token": r[8]
-
+                "last_token": r[8],
+                "last_success": r[9],
+                "failing_since": r[10]
             }
             for r in rows
         ]
@@ -79,6 +81,20 @@ class PusherStore(SQLBaseStore):
                                       {'last_token': last_token}
         )
 
+    @defer.inlineCallbacks
+    def update_pusher_last_token_and_success(self, user_name, pushkey, last_token, last_success):
+        yield self._simple_update_one(PushersTable.table_name,
+                                      {'user_name': user_name, 'pushkey': pushkey},
+                                      {'last_token': last_token, 'last_success': last_success}
+        )
+
+    @defer.inlineCallbacks
+    def update_pusher_failing_since(self, user_name, pushkey, failing_since):
+        yield self._simple_update_one(PushersTable.table_name,
+                                      {'user_name': user_name, 'pushkey': pushkey},
+                                      {'failing_since': failing_since}
+        )
+
 
 class PushersTable(Table):
     table_name = "pushers"
@@ -92,7 +108,9 @@ class PushersTable(Table):
         "device_display_name",
         "pushkey",
         "data",
-        "last_token"
+        "last_token",
+        "last_success",
+        "failing_since"
     ]
 
     EntryType = collections.namedtuple("PusherEntry", fields)
\ No newline at end of file
diff --git a/synapse/storage/schema/delta/v7.sql b/synapse/storage/schema/delta/v7.sql
index 7f6852485d..e83f7e7436 100644
--- a/synapse/storage/schema/delta/v7.sql
+++ b/synapse/storage/schema/delta/v7.sql
@@ -23,6 +23,8 @@ CREATE TABLE IF NOT EXISTS pushers (
   pushkey blob NOT NULL,
   data text,
   last_token TEXT,
+  last_success BIGINT,
+  failing_since BIGINT,
   FOREIGN KEY(user_name) REFERENCES users(name),
   UNIQUE (user_name, pushkey)
 );
diff --git a/synapse/storage/schema/pusher.sql b/synapse/storage/schema/pusher.sql
index 7f6852485d..e83f7e7436 100644
--- a/synapse/storage/schema/pusher.sql
+++ b/synapse/storage/schema/pusher.sql
@@ -23,6 +23,8 @@ CREATE TABLE IF NOT EXISTS pushers (
   pushkey blob NOT NULL,
   data text,
   last_token TEXT,
+  last_success BIGINT,
+  failing_since BIGINT,
   FOREIGN KEY(user_name) REFERENCES users(name),
   UNIQUE (user_name, pushkey)
 );
-- 
cgit 1.4.1


From 88af58d41d561f1d9f6bbbfb2a1e8bd00dbbe638 Mon Sep 17 00:00:00 2001
From: David Baker <dbkr@matrix.org>
Date: Wed, 3 Dec 2014 13:37:02 +0000
Subject: Update to app_id / app_instance_id (partially) and mangle to be PEP8
 compliant.

---
 synapse/push/__init__.py            | 97 +++++++++++++++++++++++++------------
 synapse/push/httppusher.py          | 75 +++++++++++++++-------------
 synapse/push/pusherpool.py          | 75 +++++++++++++++++-----------
 synapse/rest/pusher.py              | 32 +++++++-----
 synapse/storage/pusher.py           | 54 ++++++++++++---------
 synapse/storage/schema/delta/v7.sql |  5 +-
 synapse/storage/schema/pusher.sql   |  5 +-
 7 files changed, 213 insertions(+), 130 deletions(-)

(limited to 'synapse/storage')

diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index a96f0f0183..5fca3bd772 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -24,90 +24,127 @@ import logging
 
 logger = logging.getLogger(__name__)
 
+
 class Pusher(object):
     INITIAL_BACKOFF = 1000
     MAX_BACKOFF = 60 * 60 * 1000
     GIVE_UP_AFTER = 24 * 60 * 60 * 1000
 
-    def __init__(self, _hs, user_name, app, app_display_name, device_display_name, pushkey, data,
+    def __init__(self, _hs, user_name, app_id, app_instance_id,
+                 app_display_name, device_display_name, pushkey, data,
                  last_token, last_success, failing_since):
         self.hs = _hs
         self.evStreamHandler = self.hs.get_handlers().event_stream_handler
         self.store = self.hs.get_datastore()
         self.clock = self.hs.get_clock()
         self.user_name = user_name
-        self.app = app
+        self.app_id = app_id
+        self.app_instance_id = app_instance_id
         self.app_display_name = app_display_name
         self.device_display_name = device_display_name
         self.pushkey = pushkey
         self.data = data
         self.last_token = last_token
+        self.last_success = last_success  # not actually used
         self.backoff_delay = Pusher.INITIAL_BACKOFF
-        self.failing_since = None
+        self.failing_since = failing_since
 
     @defer.inlineCallbacks
     def start(self):
         if not self.last_token:
-            # First-time setup: get a token to start from (we can't just start from no token, ie. 'now'
-            # because we need the result to be reproduceable in case we fail to dispatch the push)
+            # First-time setup: get a token to start from (we can't
+            # just start from no token, ie. 'now'
+            # because we need the result to be reproduceable in case
+            # we fail to dispatch the push)
             config = PaginationConfig(from_token=None, limit='1')
-            chunk = yield self.evStreamHandler.get_stream(self.user_name, config, timeout=0)
+            chunk = yield self.evStreamHandler.get_stream(
+                self.user_name, config, timeout=0)
             self.last_token = chunk['end']
-            self.store.update_pusher_last_token(self.user_name, self.pushkey, self.last_token)
+            self.store.update_pusher_last_token(
+                self.user_name, self.pushkey, self.last_token)
             logger.info("Pusher %s for user %s starting from token %s",
                         self.pushkey, self.user_name, self.last_token)
 
         while True:
             from_tok = StreamToken.from_string(self.last_token)
             config = PaginationConfig(from_token=from_tok, limit='1')
-            chunk = yield self.evStreamHandler.get_stream(self.user_name, config, timeout=100*365*24*60*60*1000)
+            chunk = yield self.evStreamHandler.get_stream(
+                self.user_name, config, timeout=100*365*24*60*60*1000)
 
-            # limiting to 1 may get 1 event plus 1 presence event, so pick out the actual event
-            singleEvent = None
+            # limiting to 1 may get 1 event plus 1 presence event, so
+            # pick out the actual event
+            single_event = None
             for c in chunk['chunk']:
-                if 'event_id' in c: # Hmmm...
-                    singleEvent = c
+                if 'event_id' in c:  # Hmmm...
+                    single_event = c
                     break
-            if not singleEvent:
+            if not single_event:
                 continue
 
-            ret = yield self.dispatchPush(singleEvent)
-            if (ret):
+            ret = yield self.dispatch_push(single_event)
+            if ret:
                 self.backoff_delay = Pusher.INITIAL_BACKOFF
                 self.last_token = chunk['end']
-                self.store.update_pusher_last_token_and_success(self.user_name, self.pushkey,
-                                                                self.last_token, self.clock.time_msec())
+                self.store.update_pusher_last_token_and_success(
+                    self.user_name,
+                    self.pushkey,
+                    self.last_token,
+                    self.clock.time_msec()
+                )
                 if self.failing_since:
                     self.failing_since = None
-                    self.store.update_pusher_failing_since(self.user_name, self.pushkey, self.failing_since)
+                    self.store.update_pusher_failing_since(
+                        self.user_name,
+                        self.pushkey,
+                        self.failing_since)
             else:
                 if not self.failing_since:
                     self.failing_since = self.clock.time_msec()
-                    self.store.update_pusher_failing_since(self.user_name, self.pushkey, self.failing_since)
+                    self.store.update_pusher_failing_since(
+                        self.user_name,
+                        self.pushkey,
+                        self.failing_since
+                    )
 
-                if self.failing_since and self.failing_since < self.clock.time_msec() - Pusher.GIVE_UP_AFTER:
-                    # we really only give up so that if the URL gets fixed, we don't suddenly deliver a load
+                if self.failing_since and \
+                   self.failing_since < \
+                   self.clock.time_msec() - Pusher.GIVE_UP_AFTER:
+                    # we really only give up so that if the URL gets
+                    # fixed, we don't suddenly deliver a load
                     # of old notifications.
-                    logger.warn("Giving up on a notification to user %s, pushkey %s",
+                    logger.warn("Giving up on a notification to user %s, "
+                                "pushkey %s",
                                 self.user_name, self.pushkey)
                     self.backoff_delay = Pusher.INITIAL_BACKOFF
                     self.last_token = chunk['end']
-                    self.store.update_pusher_last_token(self.user_name, self.pushkey, self.last_token)
+                    self.store.update_pusher_last_token(
+                        self.user_name,
+                        self.pushkey,
+                        self.last_token
+                    )
 
                     self.failing_since = None
-                    self.store.update_pusher_failing_since(self.user_name, self.pushkey, self.failing_since)
+                    self.store.update_pusher_failing_since(
+                        self.user_name,
+                        self.pushkey,
+                        self.failing_since
+                    )
                 else:
-                    logger.warn("Failed to dispatch push for user %s (failing for %dms)."
+                    logger.warn("Failed to dispatch push for user %s "
+                                "(failing for %dms)."
                                 "Trying again in %dms",
-                            self.user_name,
-                            self.clock.time_msec() - self.failing_since,
-                            self.backoff_delay
-                    )
+                                self.user_name,
+                                self.clock.time_msec() - self.failing_since,
+                                self.backoff_delay
+                                )
                     yield synapse.util.async.sleep(self.backoff_delay / 1000.0)
-                    self.backoff_delay *=2
+                    self.backoff_delay *= 2
                     if self.backoff_delay > Pusher.MAX_BACKOFF:
                         self.backoff_delay = Pusher.MAX_BACKOFF
 
+    def dispatch_push(self, p):
+        pass
+
 
 class PusherConfigException(Exception):
     def __init__(self, msg):
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 33d735b974..fd7fe4e39c 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -22,21 +22,28 @@ import logging
 
 logger = logging.getLogger(__name__)
 
+
 class HttpPusher(Pusher):
-    def __init__(self, _hs, user_name, app, app_display_name, device_display_name, pushkey, data,
+    def __init__(self, _hs, user_name, app_id, app_instance_id,
+                 app_display_name, device_display_name, pushkey, data,
                  last_token, last_success, failing_since):
-        super(HttpPusher, self).__init__(_hs,
-                                         user_name,
-                                         app,
-                                         app_display_name,
-                                         device_display_name,
-                                         pushkey,
-                                         data,
-                                         last_token,
-                                         last_success,
-                                         failing_since)
+        super(HttpPusher, self).__init__(
+            _hs,
+            user_name,
+            app_id,
+            app_instance_id,
+            app_display_name,
+            device_display_name,
+            pushkey,
+            data,
+            last_token,
+            last_success,
+            failing_since
+        )
         if 'url' not in data:
-            raise PusherConfigException("'url' required in data for HTTP pusher")
+            raise PusherConfigException(
+                "'url' required in data for HTTP pusher"
+            )
         self.url = data['url']
         self.httpCli = SimpleHttpClient(self.hs)
         self.data_minus_url = {}
@@ -53,34 +60,36 @@ class HttpPusher(Pusher):
             return None
 
         return {
-           'notification': {
-               'transition' : 'new', # everything is new for now: we don't have read receipts
-               'id': event['event_id'],
-               'type': event['type'],
-               'from': event['user_id'],
-               # we may have to fetch this over federation and we can't trust it anyway: is it worth it?
-               #'fromDisplayName': 'Steve Stevington'
-           },
-           #'counts': { -- we don't mark messages as read yet so we have no way of knowing
-           #    'unread': 1,
-           #    'missedCalls': 2
-           # },
-           'devices': {
-               self.pushkey: {
-                   'data' : self.data_minus_url
+            'notification': {
+                'transition': 'new',
+                # everything is new for now: we don't have read receipts
+                'id': event['event_id'],
+                'type': event['type'],
+                'from': event['user_id'],
+                # we may have to fetch this over federation and we
+                # can't trust it anyway: is it worth it?
+                #'fromDisplayName': 'Steve Stevington'
+            },
+            #'counts': { -- we don't mark messages as read yet so
+            # we have no way of knowing
+            #    'unread': 1,
+            #    'missedCalls': 2
+            # },
+            'devices': {
+                self.pushkey: {
+                    'data': self.data_minus_url
                 }
-           }
+            }
         }
 
     @defer.inlineCallbacks
-    def dispatchPush(self, event):
-        notificationDict = self._build_notification_dict(event)
-        if not notificationDict:
+    def dispatch_push(self, event):
+        notification_dict = self._build_notification_dict(event)
+        if not notification_dict:
             defer.returnValue(True)
         try:
-            yield self.httpCli.post_json_get_json(self.url, notificationDict)
+            yield self.httpCli.post_json_get_json(self.url, notification_dict)
         except:
             logger.exception("Failed to push %s ", self.url)
             defer.returnValue(False)
         defer.returnValue(True)
-
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 3fa5a4c4ff..045c36f3b7 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -34,13 +34,17 @@ class PusherPool:
     def start(self):
         self._pushers_added()
 
-    def add_pusher(self, user_name, kind, app, app_display_name, device_display_name, pushkey, data):
-        # we try to create the pusher just to validate the config: it will then get pulled out of the database,
-        # recreated, added and started: this means we have only one code path adding pushers.
+    def add_pusher(self, user_name, kind, app_id, app_instance_id,
+                   app_display_name, device_display_name, pushkey, data):
+        # we try to create the pusher just to validate the config: it
+        # will then get pulled out of the database,
+        # recreated, added and started: this means we have only one
+        # code path adding pushers.
         self._create_pusher({
             "user_name": user_name,
             "kind": kind,
-            "app": app,
+            "app_id": app_id,
+            "app_instance_id": app_instance_id,
             "app_display_name": app_display_name,
             "device_display_name": device_display_name,
             "pushkey": pushkey,
@@ -49,42 +53,55 @@ class PusherPool:
             "last_success": None,
             "failing_since": None
         })
-        self._add_pusher_to_store(user_name, kind, app, app_display_name, device_display_name, pushkey, data)
+        self._add_pusher_to_store(user_name, kind, app_id, app_instance_id,
+                                  app_display_name, device_display_name,
+                                  pushkey, data)
 
     @defer.inlineCallbacks
-    def _add_pusher_to_store(self, user_name, kind, app, app_display_name, device_display_name, pushkey, data):
-        yield self.store.add_pusher(user_name=user_name,
-                                 kind=kind,
-                                 app=app,
-                                 app_display_name=app_display_name,
-                                 device_display_name=device_display_name,
-                                 pushkey=pushkey,
-                                 data=json.dumps(data))
+    def _add_pusher_to_store(self, user_name, kind, app_id, app_instance_id,
+                             app_display_name, device_display_name,
+                             pushkey, data):
+        yield self.store.add_pusher(
+            user_name=user_name,
+            kind=kind,
+            app_id=app_id,
+            app_instance_id=app_instance_id,
+            app_display_name=app_display_name,
+            device_display_name=device_display_name,
+            pushkey=pushkey,
+            data=json.dumps(data)
+        )
         self._pushers_added()
 
     def _create_pusher(self, pusherdict):
         if pusherdict['kind'] == 'http':
-            return HttpPusher(self.hs,
-                               user_name=pusherdict['user_name'],
-                               app=pusherdict['app'],
-                               app_display_name=pusherdict['app_display_name'],
-                               device_display_name=pusherdict['device_display_name'],
-                               pushkey=pusherdict['pushkey'],
-                               data=pusherdict['data'],
-                               last_token=pusherdict['last_token'],
-                               last_success=pusherdict['last_success'],
-                               failing_since=pusherdict['failing_since']
-                               )
+            return HttpPusher(
+                self.hs,
+                user_name=pusherdict['user_name'],
+                app_id=pusherdict['app_id'],
+                app_instance_id=pusherdict['app_instance_id'],
+                app_display_name=pusherdict['app_display_name'],
+                device_display_name=pusherdict['device_display_name'],
+                pushkey=pusherdict['pushkey'],
+                data=pusherdict['data'],
+                last_token=pusherdict['last_token'],
+                last_success=pusherdict['last_success'],
+                failing_since=pusherdict['failing_since']
+            )
         else:
-            raise PusherConfigException("Unknown pusher type '%s' for user %s" %
-                                        (pusherdict['kind'], pusherdict['user_name']))
+            raise PusherConfigException(
+                "Unknown pusher type '%s' for user %s" %
+                (pusherdict['kind'], pusherdict['user_name'])
+            )
 
     @defer.inlineCallbacks
     def _pushers_added(self):
-        pushers = yield self.store.get_all_pushers_after_id(self.last_pusher_started)
+        pushers = yield self.store.get_all_pushers_after_id(
+            self.last_pusher_started
+        )
         for p in pushers:
             p['data'] = json.loads(p['data'])
-        if (len(pushers)):
+        if len(pushers):
             self.last_pusher_started = pushers[-1]['id']
 
         self._start_pushers(pushers)
@@ -95,4 +112,4 @@ class PusherPool:
             p = self._create_pusher(pusherdict)
             if p:
                 self.pushers.append(p)
-                p.start()
\ No newline at end of file
+                p.start()
diff --git a/synapse/rest/pusher.py b/synapse/rest/pusher.py
index 85d0d1c8cd..a39341cd8b 100644
--- a/synapse/rest/pusher.py
+++ b/synapse/rest/pusher.py
@@ -31,30 +31,37 @@ class PusherRestServlet(RestServlet):
 
         content = _parse_json(request)
 
-        reqd = ['kind', 'app', 'app_display_name', 'device_display_name', 'data']
+        reqd = ['kind', 'app_id', 'app_instance_id', 'app_display_name',
+                'device_display_name', 'data']
         missing = []
         for i in reqd:
             if i not in content:
                 missing.append(i)
         if len(missing):
-            raise SynapseError(400, "Missing parameters: "+','.join(missing), errcode=Codes.MISSING_PARAM)
+            raise SynapseError(400, "Missing parameters: "+','.join(missing),
+                               errcode=Codes.MISSING_PARAM)
 
         pusher_pool = self.hs.get_pusherpool()
         try:
-            pusher_pool.add_pusher(user_name=user.to_string(),
-                                     kind=content['kind'],
-                                     app=content['app'],
-                                     app_display_name=content['app_display_name'],
-                                     device_display_name=content['device_display_name'],
-                                     pushkey=pushkey,
-                                     data=content['data'])
+            pusher_pool.add_pusher(
+                user_name=user.to_string(),
+                kind=content['kind'],
+                app_id=content['app_id'],
+                app_instance_id=content['app_instance_id'],
+                app_display_name=content['app_display_name'],
+                device_display_name=content['device_display_name'],
+                pushkey=pushkey,
+                data=content['data']
+            )
         except PusherConfigException as pce:
-            raise SynapseError(400, "Config Error: "+pce.message, errcode=Codes.MISSING_PARAM)
+            raise SynapseError(400, "Config Error: "+pce.message,
+                               errcode=Codes.MISSING_PARAM)
 
         defer.returnValue((200, {}))
 
-    def on_OPTIONS(self, request):
-        return (200, {})
+    def on_OPTIONS(self, _):
+        return 200, {}
+
 
 # XXX: C+ped from rest/room.py - surely this should be common?
 def _parse_json(request):
@@ -67,5 +74,6 @@ def _parse_json(request):
     except ValueError:
         raise SynapseError(400, "Content not JSON.", errcode=Codes.NOT_JSON)
 
+
 def register_servlets(hs, http_server):
     PusherRestServlet(hs).register(http_server)
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index ce158c4b18..a858e46f3b 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -25,11 +25,13 @@ import logging
 
 logger = logging.getLogger(__name__)
 
+
 class PusherStore(SQLBaseStore):
     @defer.inlineCallbacks
     def get_all_pushers_after_id(self, min_id):
         sql = (
-            "SELECT id, user_name, kind, app, app_display_name, device_display_name, pushkey, data, "
+            "SELECT id, user_name, kind, app_id, app_instance_id,"
+            "app_display_name, device_display_name, pushkey, data, "
             "last_token, last_success, failing_since "
             "FROM pushers "
             "WHERE id > ?"
@@ -42,14 +44,15 @@ class PusherStore(SQLBaseStore):
                 "id": r[0],
                 "user_name": r[1],
                 "kind": r[2],
-                "app": r[3],
-                "app_display_name": r[4],
-                "device_display_name": r[5],
-                "pushkey": r[6],
-                "data": r[7],
-                "last_token": r[8],
-                "last_success": r[9],
-                "failing_since": r[10]
+                "app_id": r[3],
+                "app_instance_id": r[4],
+                "app_display_name": r[5],
+                "device_display_name": r[6],
+                "pushkey": r[7],
+                "data": r[8],
+                "last_token": r[9],
+                "last_success": r[10],
+                "failing_since": r[11]
             }
             for r in rows
         ]
@@ -57,12 +60,14 @@ class PusherStore(SQLBaseStore):
         defer.returnValue(ret)
 
     @defer.inlineCallbacks
-    def add_pusher(self, user_name, kind, app, app_display_name, device_display_name, pushkey, data):
+    def add_pusher(self, user_name, kind, app_id, app_instance_id,
+                   app_display_name, device_display_name, pushkey, data):
         try:
             yield self._simple_insert(PushersTable.table_name, dict(
                 user_name=user_name,
                 kind=kind,
-                app=app,
+                app_id=app_id,
+                app_instance_id=app_instance_id,
                 app_display_name=app_display_name,
                 device_display_name=device_display_name,
                 pushkey=pushkey,
@@ -76,23 +81,27 @@ class PusherStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def update_pusher_last_token(self, user_name, pushkey, last_token):
-        yield self._simple_update_one(PushersTable.table_name,
-                                      {'user_name': user_name, 'pushkey': pushkey},
-                                      {'last_token': last_token}
+        yield self._simple_update_one(
+            PushersTable.table_name,
+            {'user_name': user_name, 'pushkey': pushkey},
+            {'last_token': last_token}
         )
 
     @defer.inlineCallbacks
-    def update_pusher_last_token_and_success(self, user_name, pushkey, last_token, last_success):
-        yield self._simple_update_one(PushersTable.table_name,
-                                      {'user_name': user_name, 'pushkey': pushkey},
-                                      {'last_token': last_token, 'last_success': last_success}
+    def update_pusher_last_token_and_success(self, user_name, pushkey,
+                                             last_token, last_success):
+        yield self._simple_update_one(
+            PushersTable.table_name,
+            {'user_name': user_name, 'pushkey': pushkey},
+            {'last_token': last_token, 'last_success': last_success}
         )
 
     @defer.inlineCallbacks
     def update_pusher_failing_since(self, user_name, pushkey, failing_since):
-        yield self._simple_update_one(PushersTable.table_name,
-                                      {'user_name': user_name, 'pushkey': pushkey},
-                                      {'failing_since': failing_since}
+        yield self._simple_update_one(
+            PushersTable.table_name,
+            {'user_name': user_name, 'pushkey': pushkey},
+            {'failing_since': failing_since}
         )
 
 
@@ -103,7 +112,8 @@ class PushersTable(Table):
         "id",
         "user_name",
         "kind",
-        "app"
+        "app_id",
+        "app_instance_id",
         "app_display_name",
         "device_display_name",
         "pushkey",
diff --git a/synapse/storage/schema/delta/v7.sql b/synapse/storage/schema/delta/v7.sql
index e83f7e7436..b60aeda756 100644
--- a/synapse/storage/schema/delta/v7.sql
+++ b/synapse/storage/schema/delta/v7.sql
@@ -17,11 +17,12 @@ CREATE TABLE IF NOT EXISTS pushers (
   id INTEGER PRIMARY KEY AUTOINCREMENT,
   user_name TEXT NOT NULL,
   kind varchar(8) NOT NULL,
-  app varchar(64) NOT NULL,
+  app_id varchar(64) NOT NULL,
+  app_instance_id varchar(64) NOT NULL,
   app_display_name varchar(64) NOT NULL,
   device_display_name varchar(128) NOT NULL,
   pushkey blob NOT NULL,
-  data text,
+  data blob,
   last_token TEXT,
   last_success BIGINT,
   failing_since BIGINT,
diff --git a/synapse/storage/schema/pusher.sql b/synapse/storage/schema/pusher.sql
index e83f7e7436..b60aeda756 100644
--- a/synapse/storage/schema/pusher.sql
+++ b/synapse/storage/schema/pusher.sql
@@ -17,11 +17,12 @@ CREATE TABLE IF NOT EXISTS pushers (
   id INTEGER PRIMARY KEY AUTOINCREMENT,
   user_name TEXT NOT NULL,
   kind varchar(8) NOT NULL,
-  app varchar(64) NOT NULL,
+  app_id varchar(64) NOT NULL,
+  app_instance_id varchar(64) NOT NULL,
   app_display_name varchar(64) NOT NULL,
   device_display_name varchar(128) NOT NULL,
   pushkey blob NOT NULL,
-  data text,
+  data blob,
   last_token TEXT,
   last_success BIGINT,
   failing_since BIGINT,
-- 
cgit 1.4.1


From 9728c305a34a1f9546d2ce0ef4c54352dc55a16d Mon Sep 17 00:00:00 2001
From: David Baker <dave@matrix.org>
Date: Thu, 18 Dec 2014 14:49:22 +0000
Subject: after a few rethinks, a working implementation of pushers.

---
 synapse/push/__init__.py            | 12 ++++--
 synapse/push/httppusher.py          | 25 +++++------
 synapse/push/pusherpool.py          | 47 +++++++++++----------
 synapse/rest/pusher.py              | 13 +++---
 synapse/storage/_base.py            | 45 ++++++++++++++++++++
 synapse/storage/pusher.py           | 83 +++++++++++++++++++++++++------------
 synapse/storage/schema/delta/v7.sql |  3 +-
 synapse/storage/schema/pusher.sql   |  3 +-
 8 files changed, 158 insertions(+), 73 deletions(-)

(limited to 'synapse/storage')

diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index 5fca3bd772..5fe8719fe7 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -30,7 +30,7 @@ class Pusher(object):
     MAX_BACKOFF = 60 * 60 * 1000
     GIVE_UP_AFTER = 24 * 60 * 60 * 1000
 
-    def __init__(self, _hs, user_name, app_id, app_instance_id,
+    def __init__(self, _hs, user_name, app_id,
                  app_display_name, device_display_name, pushkey, data,
                  last_token, last_success, failing_since):
         self.hs = _hs
@@ -39,7 +39,6 @@ class Pusher(object):
         self.clock = self.hs.get_clock()
         self.user_name = user_name
         self.app_id = app_id
-        self.app_instance_id = app_instance_id
         self.app_display_name = app_display_name
         self.device_display_name = device_display_name
         self.pushkey = pushkey
@@ -48,6 +47,7 @@ class Pusher(object):
         self.last_success = last_success  # not actually used
         self.backoff_delay = Pusher.INITIAL_BACKOFF
         self.failing_since = failing_since
+        self.alive = True
 
     @defer.inlineCallbacks
     def start(self):
@@ -65,7 +65,7 @@ class Pusher(object):
             logger.info("Pusher %s for user %s starting from token %s",
                         self.pushkey, self.user_name, self.last_token)
 
-        while True:
+        while self.alive:
             from_tok = StreamToken.from_string(self.last_token)
             config = PaginationConfig(from_token=from_tok, limit='1')
             chunk = yield self.evStreamHandler.get_stream(
@@ -81,6 +81,9 @@ class Pusher(object):
             if not single_event:
                 continue
 
+            if not self.alive:
+                continue
+
             ret = yield self.dispatch_push(single_event)
             if ret:
                 self.backoff_delay = Pusher.INITIAL_BACKOFF
@@ -142,6 +145,9 @@ class Pusher(object):
                     if self.backoff_delay > Pusher.MAX_BACKOFF:
                         self.backoff_delay = Pusher.MAX_BACKOFF
 
+    def stop(self):
+        self.alive = False
+
     def dispatch_push(self, p):
         pass
 
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index fd7fe4e39c..f94f673391 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -24,14 +24,13 @@ logger = logging.getLogger(__name__)
 
 
 class HttpPusher(Pusher):
-    def __init__(self, _hs, user_name, app_id, app_instance_id,
+    def __init__(self, _hs, user_name, app_id,
                  app_display_name, device_display_name, pushkey, data,
                  last_token, last_success, failing_since):
         super(HttpPusher, self).__init__(
             _hs,
             user_name,
             app_id,
-            app_instance_id,
             app_display_name,
             device_display_name,
             pushkey,
@@ -69,16 +68,18 @@ class HttpPusher(Pusher):
                 # we may have to fetch this over federation and we
                 # can't trust it anyway: is it worth it?
                 #'fromDisplayName': 'Steve Stevington'
-            },
-            #'counts': { -- we don't mark messages as read yet so
-            # we have no way of knowing
-            #    'unread': 1,
-            #    'missedCalls': 2
-            # },
-            'devices': {
-                self.pushkey: {
-                    'data': self.data_minus_url
-                }
+                #'counts': { -- we don't mark messages as read yet so
+                # we have no way of knowing
+                #    'unread': 1,
+                #    'missedCalls': 2
+                # },
+                'devices': [
+                    {
+                        'app_id': self.app_id,
+                        'pushkey': self.pushkey,
+                        'data': self.data_minus_url
+                    }
+                ]
             }
         }
 
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 045c36f3b7..d34ef3f6cf 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -24,17 +24,23 @@ import json
 
 logger = logging.getLogger(__name__)
 
+
 class PusherPool:
     def __init__(self, _hs):
         self.hs = _hs
         self.store = self.hs.get_datastore()
-        self.pushers = []
+        self.pushers = {}
         self.last_pusher_started = -1
 
+    @defer.inlineCallbacks
     def start(self):
-        self._pushers_added()
+        pushers = yield self.store.get_all_pushers()
+        for p in pushers:
+            p['data'] = json.loads(p['data'])
+        self._start_pushers(pushers)
 
-    def add_pusher(self, user_name, kind, app_id, app_instance_id,
+    @defer.inlineCallbacks
+    def add_pusher(self, user_name, kind, app_id,
                    app_display_name, device_display_name, pushkey, data):
         # we try to create the pusher just to validate the config: it
         # will then get pulled out of the database,
@@ -44,7 +50,6 @@ class PusherPool:
             "user_name": user_name,
             "kind": kind,
             "app_id": app_id,
-            "app_instance_id": app_instance_id,
             "app_display_name": app_display_name,
             "device_display_name": device_display_name,
             "pushkey": pushkey,
@@ -53,25 +58,26 @@ class PusherPool:
             "last_success": None,
             "failing_since": None
         })
-        self._add_pusher_to_store(user_name, kind, app_id, app_instance_id,
-                                  app_display_name, device_display_name,
-                                  pushkey, data)
+        yield self._add_pusher_to_store(
+            user_name, kind, app_id,
+            app_display_name, device_display_name,
+            pushkey, data
+        )
 
     @defer.inlineCallbacks
-    def _add_pusher_to_store(self, user_name, kind, app_id, app_instance_id,
+    def _add_pusher_to_store(self, user_name, kind, app_id,
                              app_display_name, device_display_name,
                              pushkey, data):
         yield self.store.add_pusher(
             user_name=user_name,
             kind=kind,
             app_id=app_id,
-            app_instance_id=app_instance_id,
             app_display_name=app_display_name,
             device_display_name=device_display_name,
             pushkey=pushkey,
             data=json.dumps(data)
         )
-        self._pushers_added()
+        self._refresh_pusher((app_id, pushkey))
 
     def _create_pusher(self, pusherdict):
         if pusherdict['kind'] == 'http':
@@ -79,7 +85,6 @@ class PusherPool:
                 self.hs,
                 user_name=pusherdict['user_name'],
                 app_id=pusherdict['app_id'],
-                app_instance_id=pusherdict['app_instance_id'],
                 app_display_name=pusherdict['app_display_name'],
                 device_display_name=pusherdict['device_display_name'],
                 pushkey=pusherdict['pushkey'],
@@ -95,21 +100,21 @@ class PusherPool:
             )
 
     @defer.inlineCallbacks
-    def _pushers_added(self):
-        pushers = yield self.store.get_all_pushers_after_id(
-            self.last_pusher_started
+    def _refresh_pusher(self, app_id_pushkey):
+        p = yield self.store.get_pushers_by_app_id_and_pushkey(
+            app_id_pushkey
         )
-        for p in pushers:
-            p['data'] = json.loads(p['data'])
-        if len(pushers):
-            self.last_pusher_started = pushers[-1]['id']
+        p['data'] = json.loads(p['data'])
 
-        self._start_pushers(pushers)
+        self._start_pushers([p])
 
     def _start_pushers(self, pushers):
-        logger.info("Starting %d pushers", (len(pushers)))
+        logger.info("Starting %d pushers", len(pushers))
         for pusherdict in pushers:
             p = self._create_pusher(pusherdict)
             if p:
-                self.pushers.append(p)
+                fullid = "%s:%s" % (pusherdict['app_id'], pusherdict['pushkey'])
+                if fullid in self.pushers:
+                    self.pushers[fullid].stop()
+                self.pushers[fullid] = p
                 p.start()
diff --git a/synapse/rest/pusher.py b/synapse/rest/pusher.py
index a39341cd8b..5b371318d0 100644
--- a/synapse/rest/pusher.py
+++ b/synapse/rest/pusher.py
@@ -23,16 +23,16 @@ import json
 
 
 class PusherRestServlet(RestServlet):
-    PATTERN = client_path_pattern("/pushers/(?P<pushkey>[\w]*)$")
+    PATTERN = client_path_pattern("/pushers/set$")
 
     @defer.inlineCallbacks
-    def on_PUT(self, request, pushkey):
+    def on_POST(self, request):
         user = yield self.auth.get_user_by_req(request)
 
         content = _parse_json(request)
 
-        reqd = ['kind', 'app_id', 'app_instance_id', 'app_display_name',
-                'device_display_name', 'data']
+        reqd = ['kind', 'app_id', 'app_display_name',
+                'device_display_name', 'pushkey', 'data']
         missing = []
         for i in reqd:
             if i not in content:
@@ -43,14 +43,13 @@ class PusherRestServlet(RestServlet):
 
         pusher_pool = self.hs.get_pusherpool()
         try:
-            pusher_pool.add_pusher(
+            yield pusher_pool.add_pusher(
                 user_name=user.to_string(),
                 kind=content['kind'],
                 app_id=content['app_id'],
-                app_instance_id=content['app_instance_id'],
                 app_display_name=content['app_display_name'],
                 device_display_name=content['device_display_name'],
-                pushkey=pushkey,
+                pushkey=content['pushkey'],
                 data=content['data']
             )
         except PusherConfigException as pce:
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 4881f03368..eb8cc4a9f3 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -195,6 +195,51 @@ class SQLBaseStore(object):
         txn.execute(sql, values.values())
         return txn.lastrowid
 
+    def _simple_upsert(self, table, keyvalues, values):
+        """
+        :param table: The table to upsert into
+        :param keyvalues: Dict of the unique key tables and their new values
+        :param values: Dict of all the nonunique columns and their new values
+        :return: A deferred
+        """
+        return self.runInteraction(
+            "_simple_upsert",
+            self._simple_upsert_txn, table, keyvalues, values
+        )
+
+    def _simple_upsert_txn(self, txn, table, keyvalues, values):
+        # Try to update
+        sql = "UPDATE %s SET %s WHERE %s" % (
+            table,
+            ", ".join("%s = ?" % (k) for k in values),
+            " AND ".join("%s = ?" % (k) for k in keyvalues)
+        )
+        sqlargs = values.values() + keyvalues.values()
+        logger.debug(
+            "[SQL] %s Args=%s",
+            sql, sqlargs,
+        )
+
+        txn.execute(sql, sqlargs)
+        if txn.rowcount == 0:
+            # We didn't update and rows so insert a new one
+            allvalues = {}
+            allvalues.update(keyvalues)
+            allvalues.update(values)
+
+            sql = "INSERT INTO %s (%s) VALUES (%s)" % (
+                table,
+                ", ".join(k for k in allvalues),
+                ", ".join("?" for _ in allvalues)
+            )
+            logger.debug(
+                "[SQL] %s Args=%s",
+                sql, keyvalues.values(),
+            )
+            txn.execute(sql, allvalues.values())
+
+
+
     def _simple_select_one(self, table, keyvalues, retcols,
                            allow_none=False):
         """Executes a SELECT query on the named table, which is expected to
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index a858e46f3b..deabd9cd2e 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -28,16 +28,48 @@ logger = logging.getLogger(__name__)
 
 class PusherStore(SQLBaseStore):
     @defer.inlineCallbacks
-    def get_all_pushers_after_id(self, min_id):
+    def get_pushers_by_app_id_and_pushkey(self, app_id_and_pushkey):
         sql = (
-            "SELECT id, user_name, kind, app_id, app_instance_id,"
+            "SELECT id, user_name, kind, app_id,"
             "app_display_name, device_display_name, pushkey, data, "
             "last_token, last_success, failing_since "
             "FROM pushers "
-            "WHERE id > ?"
+            "WHERE app_id = ? AND pushkey = ?"
         )
 
-        rows = yield self._execute(None, sql, min_id)
+        rows = yield self._execute(
+            None, sql, app_id_and_pushkey[0], app_id_and_pushkey[1]
+        )
+
+        ret = [
+            {
+                "id": r[0],
+                "user_name": r[1],
+                "kind": r[2],
+                "app_id": r[3],
+                "app_display_name": r[4],
+                "device_display_name": r[5],
+                "pushkey": r[6],
+                "data": r[7],
+                "last_token": r[8],
+                "last_success": r[9],
+                "failing_since": r[10]
+            }
+            for r in rows
+        ]
+
+        defer.returnValue(ret[0])
+
+    @defer.inlineCallbacks
+    def get_all_pushers(self):
+        sql = (
+            "SELECT id, user_name, kind, app_id,"
+            "app_display_name, device_display_name, pushkey, data, "
+            "last_token, last_success, failing_since "
+            "FROM pushers"
+        )
+
+        rows = yield self._execute(None, sql)
 
         ret = [
             {
@@ -45,14 +77,13 @@ class PusherStore(SQLBaseStore):
                 "user_name": r[1],
                 "kind": r[2],
                 "app_id": r[3],
-                "app_instance_id": r[4],
-                "app_display_name": r[5],
-                "device_display_name": r[6],
-                "pushkey": r[7],
-                "data": r[8],
-                "last_token": r[9],
-                "last_success": r[10],
-                "failing_since": r[11]
+                "app_display_name": r[4],
+                "device_display_name": r[5],
+                "pushkey": r[6],
+                "data": r[7],
+                "last_token": r[8],
+                "last_success": r[9],
+                "failing_since": r[10]
             }
             for r in rows
         ]
@@ -60,21 +91,22 @@ class PusherStore(SQLBaseStore):
         defer.returnValue(ret)
 
     @defer.inlineCallbacks
-    def add_pusher(self, user_name, kind, app_id, app_instance_id,
+    def add_pusher(self, user_name, kind, app_id,
                    app_display_name, device_display_name, pushkey, data):
         try:
-            yield self._simple_insert(PushersTable.table_name, dict(
-                user_name=user_name,
-                kind=kind,
-                app_id=app_id,
-                app_instance_id=app_instance_id,
-                app_display_name=app_display_name,
-                device_display_name=device_display_name,
-                pushkey=pushkey,
-                data=data
-            ))
-        except IntegrityError:
-            raise StoreError(409, "Pushkey in use.")
+            yield self._simple_upsert(
+                PushersTable.table_name,
+                dict(
+                    app_id=app_id,
+                    pushkey=pushkey,
+                ),
+                dict(
+                    user_name=user_name,
+                    kind=kind,
+                    app_display_name=app_display_name,
+                    device_display_name=device_display_name,
+                    data=data
+                ))
         except Exception as e:
             logger.error("create_pusher with failed: %s", e)
             raise StoreError(500, "Problem creating pusher.")
@@ -113,7 +145,6 @@ class PushersTable(Table):
         "user_name",
         "kind",
         "app_id",
-        "app_instance_id",
         "app_display_name",
         "device_display_name",
         "pushkey",
diff --git a/synapse/storage/schema/delta/v7.sql b/synapse/storage/schema/delta/v7.sql
index b60aeda756..799e48d780 100644
--- a/synapse/storage/schema/delta/v7.sql
+++ b/synapse/storage/schema/delta/v7.sql
@@ -18,7 +18,6 @@ CREATE TABLE IF NOT EXISTS pushers (
   user_name TEXT NOT NULL,
   kind varchar(8) NOT NULL,
   app_id varchar(64) NOT NULL,
-  app_instance_id varchar(64) NOT NULL,
   app_display_name varchar(64) NOT NULL,
   device_display_name varchar(128) NOT NULL,
   pushkey blob NOT NULL,
@@ -27,5 +26,5 @@ CREATE TABLE IF NOT EXISTS pushers (
   last_success BIGINT,
   failing_since BIGINT,
   FOREIGN KEY(user_name) REFERENCES users(name),
-  UNIQUE (user_name, pushkey)
+  UNIQUE (app_id, pushkey)
 );
diff --git a/synapse/storage/schema/pusher.sql b/synapse/storage/schema/pusher.sql
index b60aeda756..799e48d780 100644
--- a/synapse/storage/schema/pusher.sql
+++ b/synapse/storage/schema/pusher.sql
@@ -18,7 +18,6 @@ CREATE TABLE IF NOT EXISTS pushers (
   user_name TEXT NOT NULL,
   kind varchar(8) NOT NULL,
   app_id varchar(64) NOT NULL,
-  app_instance_id varchar(64) NOT NULL,
   app_display_name varchar(64) NOT NULL,
   device_display_name varchar(128) NOT NULL,
   pushkey blob NOT NULL,
@@ -27,5 +26,5 @@ CREATE TABLE IF NOT EXISTS pushers (
   last_success BIGINT,
   failing_since BIGINT,
   FOREIGN KEY(user_name) REFERENCES users(name),
-  UNIQUE (user_name, pushkey)
+  UNIQUE (app_id, pushkey)
 );
-- 
cgit 1.4.1


From fc7c5e9cd7e0b1e29984233249311abe5cf23735 Mon Sep 17 00:00:00 2001
From: David Baker <dave@matrix.org>
Date: Thu, 18 Dec 2014 14:51:29 +0000
Subject: Rename the pusher SQL delta to v9 which the next free one

---
 synapse/storage/schema/delta/v7.sql | 30 ------------------------------
 synapse/storage/schema/delta/v9.sql | 30 ++++++++++++++++++++++++++++++
 2 files changed, 30 insertions(+), 30 deletions(-)
 delete mode 100644 synapse/storage/schema/delta/v7.sql
 create mode 100644 synapse/storage/schema/delta/v9.sql

(limited to 'synapse/storage')

diff --git a/synapse/storage/schema/delta/v7.sql b/synapse/storage/schema/delta/v7.sql
deleted file mode 100644
index 799e48d780..0000000000
--- a/synapse/storage/schema/delta/v7.sql
+++ /dev/null
@@ -1,30 +0,0 @@
-/* Copyright 2014 OpenMarket Ltd
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
--- Push notification endpoints that users have configured
-CREATE TABLE IF NOT EXISTS pushers (
-  id INTEGER PRIMARY KEY AUTOINCREMENT,
-  user_name TEXT NOT NULL,
-  kind varchar(8) NOT NULL,
-  app_id varchar(64) NOT NULL,
-  app_display_name varchar(64) NOT NULL,
-  device_display_name varchar(128) NOT NULL,
-  pushkey blob NOT NULL,
-  data blob,
-  last_token TEXT,
-  last_success BIGINT,
-  failing_since BIGINT,
-  FOREIGN KEY(user_name) REFERENCES users(name),
-  UNIQUE (app_id, pushkey)
-);
diff --git a/synapse/storage/schema/delta/v9.sql b/synapse/storage/schema/delta/v9.sql
new file mode 100644
index 0000000000..799e48d780
--- /dev/null
+++ b/synapse/storage/schema/delta/v9.sql
@@ -0,0 +1,30 @@
+/* Copyright 2014 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-- Push notification endpoints that users have configured
+CREATE TABLE IF NOT EXISTS pushers (
+  id INTEGER PRIMARY KEY AUTOINCREMENT,
+  user_name TEXT NOT NULL,
+  kind varchar(8) NOT NULL,
+  app_id varchar(64) NOT NULL,
+  app_display_name varchar(64) NOT NULL,
+  device_display_name varchar(128) NOT NULL,
+  pushkey blob NOT NULL,
+  data blob,
+  last_token TEXT,
+  last_success BIGINT,
+  failing_since BIGINT,
+  FOREIGN KEY(user_name) REFERENCES users(name),
+  UNIQUE (app_id, pushkey)
+);
-- 
cgit 1.4.1


From 173264b656b480a2f3634f49e78fd6093633af56 Mon Sep 17 00:00:00 2001
From: David Baker <dave@matrix.org>
Date: Thu, 18 Dec 2014 14:53:10 +0000
Subject: ...and bump SCHEMA_VERSION

---
 synapse/storage/__init__.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

(limited to 'synapse/storage')

diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 642e5e289e..348c3b259c 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -69,7 +69,7 @@ SCHEMAS = [
 
 # Remember to update this number every time an incompatible change is made to
 # database schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 8
+SCHEMA_VERSION = 9
 
 
 class _RollbackButIsFineException(Exception):
-- 
cgit 1.4.1


From 4c7ad50f6e50b95dfa9e0961a504e2f0d5b6921a Mon Sep 17 00:00:00 2001
From: David Baker <dave@matrix.org>
Date: Thu, 18 Dec 2014 14:55:04 +0000
Subject: Thank you, pyflakes

---
 synapse/storage/pusher.py | 1 -
 1 file changed, 1 deletion(-)

(limited to 'synapse/storage')

diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index deabd9cd2e..9b5170a5f7 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -18,7 +18,6 @@ import collections
 from ._base import SQLBaseStore, Table
 from twisted.internet import defer
 
-from sqlite3 import IntegrityError
 from synapse.api.errors import StoreError
 
 import logging
-- 
cgit 1.4.1


From afa953a29301dcae40606171ed4cdac90eefab63 Mon Sep 17 00:00:00 2001
From: David Baker <dave@matrix.org>
Date: Thu, 18 Dec 2014 15:11:06 +0000
Subject: schema version is now 10

---
 synapse/storage/__init__.py          |  2 +-
 synapse/storage/schema/delta/v10.sql | 30 ++++++++++++++++++++++++++++++
 synapse/storage/schema/delta/v9.sql  | 30 ------------------------------
 3 files changed, 31 insertions(+), 31 deletions(-)
 create mode 100644 synapse/storage/schema/delta/v10.sql
 delete mode 100644 synapse/storage/schema/delta/v9.sql

(limited to 'synapse/storage')

diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 348c3b259c..ad1765e04d 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -69,7 +69,7 @@ SCHEMAS = [
 
 # Remember to update this number every time an incompatible change is made to
 # database schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 9
+SCHEMA_VERSION = 10
 
 
 class _RollbackButIsFineException(Exception):
diff --git a/synapse/storage/schema/delta/v10.sql b/synapse/storage/schema/delta/v10.sql
new file mode 100644
index 0000000000..799e48d780
--- /dev/null
+++ b/synapse/storage/schema/delta/v10.sql
@@ -0,0 +1,30 @@
+/* Copyright 2014 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-- Push notification endpoints that users have configured
+CREATE TABLE IF NOT EXISTS pushers (
+  id INTEGER PRIMARY KEY AUTOINCREMENT,
+  user_name TEXT NOT NULL,
+  kind varchar(8) NOT NULL,
+  app_id varchar(64) NOT NULL,
+  app_display_name varchar(64) NOT NULL,
+  device_display_name varchar(128) NOT NULL,
+  pushkey blob NOT NULL,
+  data blob,
+  last_token TEXT,
+  last_success BIGINT,
+  failing_since BIGINT,
+  FOREIGN KEY(user_name) REFERENCES users(name),
+  UNIQUE (app_id, pushkey)
+);
diff --git a/synapse/storage/schema/delta/v9.sql b/synapse/storage/schema/delta/v9.sql
deleted file mode 100644
index 799e48d780..0000000000
--- a/synapse/storage/schema/delta/v9.sql
+++ /dev/null
@@ -1,30 +0,0 @@
-/* Copyright 2014 OpenMarket Ltd
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
--- Push notification endpoints that users have configured
-CREATE TABLE IF NOT EXISTS pushers (
-  id INTEGER PRIMARY KEY AUTOINCREMENT,
-  user_name TEXT NOT NULL,
-  kind varchar(8) NOT NULL,
-  app_id varchar(64) NOT NULL,
-  app_display_name varchar(64) NOT NULL,
-  device_display_name varchar(128) NOT NULL,
-  pushkey blob NOT NULL,
-  data blob,
-  last_token TEXT,
-  last_success BIGINT,
-  failing_since BIGINT,
-  FOREIGN KEY(user_name) REFERENCES users(name),
-  UNIQUE (app_id, pushkey)
-);
-- 
cgit 1.4.1


From 2cb30767fa5e428f82c6c3ebced15d568d671c3c Mon Sep 17 00:00:00 2001
From: David Baker <dave@matrix.org>
Date: Tue, 13 Jan 2015 19:48:37 +0000
Subject: Honour the 'rejected' return from push gateways

Add a timestamp to push tokens so we know the last time they we
got them from the device. Send it to the push gateways so it can
determine whether its failure is more recent than the token.
Stop and remove pushers that have been rejected.
---
 synapse/push/__init__.py             | 37 +++++++++++++++++++++++++++++++++---
 synapse/push/httppusher.py           | 15 ++++++++++-----
 synapse/push/pusherpool.py           | 12 ++++++++++++
 synapse/storage/pusher.py            | 34 ++++++++++++++++++++++-----------
 synapse/storage/schema/delta/v10.sql |  1 +
 synapse/storage/schema/pusher.sql    |  1 +
 6 files changed, 81 insertions(+), 19 deletions(-)

(limited to 'synapse/storage')

diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index f4795d559c..839f666390 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -31,8 +31,8 @@ class Pusher(object):
     GIVE_UP_AFTER = 24 * 60 * 60 * 1000
 
     def __init__(self, _hs, user_name, app_id,
-                 app_display_name, device_display_name, pushkey, data,
-                 last_token, last_success, failing_since):
+                 app_display_name, device_display_name, pushkey, pushkey_ts,
+                 data, last_token, last_success, failing_since):
         self.hs = _hs
         self.evStreamHandler = self.hs.get_handlers().event_stream_handler
         self.store = self.hs.get_datastore()
@@ -42,6 +42,7 @@ class Pusher(object):
         self.app_display_name = app_display_name
         self.device_display_name = device_display_name
         self.pushkey = pushkey
+        self.pushkey_ts = pushkey_ts
         self.data = data
         self.last_token = last_token
         self.last_success = last_success  # not actually used
@@ -98,9 +99,31 @@ class Pusher(object):
 
             processed = False
             if self._should_notify_for_event(single_event):
-                processed = yield self.dispatch_push(single_event)
+                rejected = yield self.dispatch_push(single_event)
+                if not rejected == False:
+                    processed = True
+                    for pk in rejected:
+                        if pk != self.pushkey:
+                            # for sanity, we only remove the pushkey if it
+                            # was the one we actually sent...
+                            logger.warn(
+                                ("Ignoring rejected pushkey %s because we" +
+                                "didn't send it"), (pk,)
+                            )
+                        else:
+                            logger.info(
+                                "Pushkey %s was rejected: removing",
+                                pk
+                            )
+                            yield self.hs.get_pusherpool().remove_pusher(
+                                self.app_id, pk
+                            )
             else:
                 processed = True
+
+            if not self.alive:
+                continue
+
             if processed:
                 self.backoff_delay = Pusher.INITIAL_BACKOFF
                 self.last_token = chunk['end']
@@ -165,6 +188,14 @@ class Pusher(object):
         self.alive = False
 
     def dispatch_push(self, p):
+        """
+        Overridden by implementing classes to actually deliver the notification
+        :param p: The event to notify for as a single event from the event stream
+        :return: If the notification was delivered, an array containing any
+                 pushkeys that were rejected by the push gateway.
+                 False if the notification could not be delivered (ie.
+                 should be retried).
+        """
         pass
 
 
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index f94f673391..bcfa06e2ab 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -25,8 +25,8 @@ logger = logging.getLogger(__name__)
 
 class HttpPusher(Pusher):
     def __init__(self, _hs, user_name, app_id,
-                 app_display_name, device_display_name, pushkey, data,
-                 last_token, last_success, failing_since):
+                 app_display_name, device_display_name, pushkey, pushkey_ts,
+                 data, last_token, last_success, failing_since):
         super(HttpPusher, self).__init__(
             _hs,
             user_name,
@@ -34,6 +34,7 @@ class HttpPusher(Pusher):
             app_display_name,
             device_display_name,
             pushkey,
+            pushkey_ts,
             data,
             last_token,
             last_success,
@@ -77,6 +78,7 @@ class HttpPusher(Pusher):
                     {
                         'app_id': self.app_id,
                         'pushkey': self.pushkey,
+                        'pushkeyTs': long(self.pushkey_ts / 1000),
                         'data': self.data_minus_url
                     }
                 ]
@@ -87,10 +89,13 @@ class HttpPusher(Pusher):
     def dispatch_push(self, event):
         notification_dict = self._build_notification_dict(event)
         if not notification_dict:
-            defer.returnValue(True)
+            defer.returnValue([])
         try:
-            yield self.httpCli.post_json_get_json(self.url, notification_dict)
+            resp = yield self.httpCli.post_json_get_json(self.url, notification_dict)
         except:
             logger.exception("Failed to push %s ", self.url)
             defer.returnValue(False)
-        defer.returnValue(True)
+        rejected = []
+        if 'rejected' in resp:
+            rejected = resp['rejected']
+        defer.returnValue(rejected)
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index d34ef3f6cf..edddc3003e 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -53,6 +53,7 @@ class PusherPool:
             "app_display_name": app_display_name,
             "device_display_name": device_display_name,
             "pushkey": pushkey,
+            "pushkey_ts": self.hs.get_clock().time_msec(),
             "data": data,
             "last_token": None,
             "last_success": None,
@@ -75,6 +76,7 @@ class PusherPool:
             app_display_name=app_display_name,
             device_display_name=device_display_name,
             pushkey=pushkey,
+            pushkey_ts=self.hs.get_clock().time_msec(),
             data=json.dumps(data)
         )
         self._refresh_pusher((app_id, pushkey))
@@ -88,6 +90,7 @@ class PusherPool:
                 app_display_name=pusherdict['app_display_name'],
                 device_display_name=pusherdict['device_display_name'],
                 pushkey=pusherdict['pushkey'],
+                pushkey_ts=pusherdict['pushkey_ts'],
                 data=pusherdict['data'],
                 last_token=pusherdict['last_token'],
                 last_success=pusherdict['last_success'],
@@ -118,3 +121,12 @@ class PusherPool:
                     self.pushers[fullid].stop()
                 self.pushers[fullid] = p
                 p.start()
+
+    @defer.inlineCallbacks
+    def remove_pusher(self, app_id, pushkey):
+        fullid = "%s:%s" % (app_id, pushkey)
+        if fullid in self.pushers:
+            logger.info("Stopping pusher %s", fullid)
+            self.pushers[fullid].stop()
+            del self.pushers[fullid]
+        yield self.store.delete_pusher_by_app_id_pushkey(app_id, pushkey)
\ No newline at end of file
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 9b5170a5f7..bfc4980256 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -30,7 +30,7 @@ class PusherStore(SQLBaseStore):
     def get_pushers_by_app_id_and_pushkey(self, app_id_and_pushkey):
         sql = (
             "SELECT id, user_name, kind, app_id,"
-            "app_display_name, device_display_name, pushkey, data, "
+            "app_display_name, device_display_name, pushkey, ts, data, "
             "last_token, last_success, failing_since "
             "FROM pushers "
             "WHERE app_id = ? AND pushkey = ?"
@@ -49,10 +49,11 @@ class PusherStore(SQLBaseStore):
                 "app_display_name": r[4],
                 "device_display_name": r[5],
                 "pushkey": r[6],
-                "data": r[7],
-                "last_token": r[8],
-                "last_success": r[9],
-                "failing_since": r[10]
+                "pushkey_ts": r[7],
+                "data": r[8],
+                "last_token": r[9],
+                "last_success": r[10],
+                "failing_since": r[11]
             }
             for r in rows
         ]
@@ -63,7 +64,7 @@ class PusherStore(SQLBaseStore):
     def get_all_pushers(self):
         sql = (
             "SELECT id, user_name, kind, app_id,"
-            "app_display_name, device_display_name, pushkey, data, "
+            "app_display_name, device_display_name, pushkey, ts, data, "
             "last_token, last_success, failing_since "
             "FROM pushers"
         )
@@ -79,10 +80,11 @@ class PusherStore(SQLBaseStore):
                 "app_display_name": r[4],
                 "device_display_name": r[5],
                 "pushkey": r[6],
-                "data": r[7],
-                "last_token": r[8],
-                "last_success": r[9],
-                "failing_since": r[10]
+                "pushkey_ts": r[7],
+                "data": r[8],
+                "last_token": r[9],
+                "last_success": r[10],
+                "failing_since": r[11]
             }
             for r in rows
         ]
@@ -91,7 +93,8 @@ class PusherStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def add_pusher(self, user_name, kind, app_id,
-                   app_display_name, device_display_name, pushkey, data):
+                   app_display_name, device_display_name,
+                   pushkey, pushkey_ts, data):
         try:
             yield self._simple_upsert(
                 PushersTable.table_name,
@@ -104,12 +107,20 @@ class PusherStore(SQLBaseStore):
                     kind=kind,
                     app_display_name=app_display_name,
                     device_display_name=device_display_name,
+                    ts=pushkey_ts,
                     data=data
                 ))
         except Exception as e:
             logger.error("create_pusher with failed: %s", e)
             raise StoreError(500, "Problem creating pusher.")
 
+    @defer.inlineCallbacks
+    def delete_pusher_by_app_id_pushkey(self, app_id, pushkey):
+        yield self._simple_delete_one(
+            PushersTable.table_name,
+            dict(app_id=app_id, pushkey=pushkey)
+        )
+
     @defer.inlineCallbacks
     def update_pusher_last_token(self, user_name, pushkey, last_token):
         yield self._simple_update_one(
@@ -147,6 +158,7 @@ class PushersTable(Table):
         "app_display_name",
         "device_display_name",
         "pushkey",
+        "pushkey_ts",
         "data",
         "last_token",
         "last_success",
diff --git a/synapse/storage/schema/delta/v10.sql b/synapse/storage/schema/delta/v10.sql
index 799e48d780..a991e4eb11 100644
--- a/synapse/storage/schema/delta/v10.sql
+++ b/synapse/storage/schema/delta/v10.sql
@@ -21,6 +21,7 @@ CREATE TABLE IF NOT EXISTS pushers (
   app_display_name varchar(64) NOT NULL,
   device_display_name varchar(128) NOT NULL,
   pushkey blob NOT NULL,
+  ts BIGINT NOT NULL,
   data blob,
   last_token TEXT,
   last_success BIGINT,
diff --git a/synapse/storage/schema/pusher.sql b/synapse/storage/schema/pusher.sql
index 799e48d780..a991e4eb11 100644
--- a/synapse/storage/schema/pusher.sql
+++ b/synapse/storage/schema/pusher.sql
@@ -21,6 +21,7 @@ CREATE TABLE IF NOT EXISTS pushers (
   app_display_name varchar(64) NOT NULL,
   device_display_name varchar(128) NOT NULL,
   pushkey blob NOT NULL,
+  ts BIGINT NOT NULL,
   data blob,
   last_token TEXT,
   last_success BIGINT,
-- 
cgit 1.4.1


From 2ca2dbc82183f7dbe8c01694bf1c32a8c4c4b9de Mon Sep 17 00:00:00 2001
From: David Baker <dave@matrix.org>
Date: Thu, 15 Jan 2015 16:56:18 +0000
Subject: Send room name and first alias in notification poke.

---
 synapse/push/__init__.py    | 13 +++++++++++++
 synapse/push/httppusher.py  | 16 +++++++++++++---
 synapse/storage/__init__.py | 35 +++++++++++++++++++++++++++++++++++
 3 files changed, 61 insertions(+), 3 deletions(-)

(limited to 'synapse/storage')

diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index 9cf996fb80..5f4e833add 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -61,6 +61,19 @@ class Pusher(object):
             return False
         return True
 
+    @defer.inlineCallbacks
+    def get_context_for_event(self, ev):
+        name_aliases = yield self.store.get_room_name_and_aliases(
+            ev['room_id']
+        )
+
+        ctx = {'aliases': name_aliases[1]}
+        if name_aliases[0] is not None:
+            ctx['name'] = name_aliases[0]
+
+        defer.returnValue(ctx)
+
+
     @defer.inlineCallbacks
     def start(self):
         if not self.last_token:
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index bcfa06e2ab..7631a741fa 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -50,6 +50,7 @@ class HttpPusher(Pusher):
         self.data_minus_url.update(self.data)
         del self.data_minus_url['url']
 
+    @defer.inlineCallbacks
     def _build_notification_dict(self, event):
         # we probably do not want to push for every presence update
         # (we may want to be able to set up notifications when specific
@@ -57,9 +58,11 @@ class HttpPusher(Pusher):
         # Actually, presence events will not get this far now because we
         # need to filter them out in the main Pusher code.
         if 'event_id' not in event:
-            return None
+            defer.returnValue(None)
+
+        ctx = yield self.get_context_for_event(event)
 
-        return {
+        d = {
             'notification': {
                 'transition': 'new',
                 # everything is new for now: we don't have read receipts
@@ -85,9 +88,16 @@ class HttpPusher(Pusher):
             }
         }
 
+        if len(ctx['aliases']):
+            d['notification']['roomAlias'] = ctx['aliases'][0]
+        if 'name' in ctx:
+            d['notification']['roomName'] = ctx['name']
+
+        defer.returnValue(d)
+
     @defer.inlineCallbacks
     def dispatch_push(self, event):
-        notification_dict = self._build_notification_dict(event)
+        notification_dict = yield self._build_notification_dict(event)
         if not notification_dict:
             defer.returnValue([])
         try:
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index fa7ad0eea8..191fe462a5 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -384,6 +384,41 @@ class DataStore(RoomMemberStore, RoomStore,
         events = yield self._parse_events(results)
         defer.returnValue(events)
 
+    @defer.inlineCallbacks
+    def get_room_name_and_aliases(self, room_id):
+        del_sql = (
+            "SELECT event_id FROM redactions WHERE redacts = e.event_id "
+            "LIMIT 1"
+        )
+
+        sql = (
+            "SELECT e.*, (%(redacted)s) AS redacted FROM events as e "
+            "INNER JOIN current_state_events as c ON e.event_id = c.event_id "
+            "INNER JOIN state_events as s ON e.event_id = s.event_id "
+            "WHERE c.room_id = ? "
+        ) % {
+            "redacted": del_sql,
+        }
+
+        sql += " AND (s.type = 'm.room.name' AND s.state_key = '')"
+        sql += " OR s.type = 'm.room.aliases'"
+        args = (room_id,)
+
+        results = yield self._execute_and_decode(sql, *args)
+
+        events = yield self._parse_events(results)
+
+        name = None
+        aliases = []
+
+        for e in events:
+            if e.type == 'm.room.name':
+                name = e.content['name']
+            elif e.type == 'm.room.aliases':
+                aliases.extend(e.content['aliases'])
+
+        defer.returnValue((name, aliases))
+
     @defer.inlineCallbacks
     def _get_min_token(self):
         row = yield self._execute(
-- 
cgit 1.4.1


From 2d2953cf5fce26625e56fc1abc230735d007ea1e Mon Sep 17 00:00:00 2001
From: David Baker <dave@matrix.org>
Date: Fri, 16 Jan 2015 11:24:10 +0000
Subject: Require device language when adding a pusher.

Because this seems like it might be useful to do sooner rather
than later.
---
 synapse/push/pusherpool.py           | 8 +++++---
 synapse/rest/pusher.py               | 3 ++-
 synapse/storage/pusher.py            | 3 ++-
 synapse/storage/schema/delta/v10.sql | 1 +
 synapse/storage/schema/pusher.sql    | 1 +
 5 files changed, 11 insertions(+), 5 deletions(-)

(limited to 'synapse/storage')

diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index edddc3003e..8c77f4b668 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -41,7 +41,7 @@ class PusherPool:
 
     @defer.inlineCallbacks
     def add_pusher(self, user_name, kind, app_id,
-                   app_display_name, device_display_name, pushkey, data):
+                   app_display_name, device_display_name, pushkey, lang, data):
         # we try to create the pusher just to validate the config: it
         # will then get pulled out of the database,
         # recreated, added and started: this means we have only one
@@ -54,6 +54,7 @@ class PusherPool:
             "device_display_name": device_display_name,
             "pushkey": pushkey,
             "pushkey_ts": self.hs.get_clock().time_msec(),
+            "lang": lang,
             "data": data,
             "last_token": None,
             "last_success": None,
@@ -62,13 +63,13 @@ class PusherPool:
         yield self._add_pusher_to_store(
             user_name, kind, app_id,
             app_display_name, device_display_name,
-            pushkey, data
+            pushkey, lang, data
         )
 
     @defer.inlineCallbacks
     def _add_pusher_to_store(self, user_name, kind, app_id,
                              app_display_name, device_display_name,
-                             pushkey, data):
+                             pushkey, lang, data):
         yield self.store.add_pusher(
             user_name=user_name,
             kind=kind,
@@ -77,6 +78,7 @@ class PusherPool:
             device_display_name=device_display_name,
             pushkey=pushkey,
             pushkey_ts=self.hs.get_clock().time_msec(),
+            lang=lang,
             data=json.dumps(data)
         )
         self._refresh_pusher((app_id, pushkey))
diff --git a/synapse/rest/pusher.py b/synapse/rest/pusher.py
index 5b371318d0..6b9a59adb6 100644
--- a/synapse/rest/pusher.py
+++ b/synapse/rest/pusher.py
@@ -32,7 +32,7 @@ class PusherRestServlet(RestServlet):
         content = _parse_json(request)
 
         reqd = ['kind', 'app_id', 'app_display_name',
-                'device_display_name', 'pushkey', 'data']
+                'device_display_name', 'pushkey', 'lang', 'data']
         missing = []
         for i in reqd:
             if i not in content:
@@ -50,6 +50,7 @@ class PusherRestServlet(RestServlet):
                 app_display_name=content['app_display_name'],
                 device_display_name=content['device_display_name'],
                 pushkey=content['pushkey'],
+                lang=content['lang'],
                 data=content['data']
             )
         except PusherConfigException as pce:
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index bfc4980256..4eb30c7bdf 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -94,7 +94,7 @@ class PusherStore(SQLBaseStore):
     @defer.inlineCallbacks
     def add_pusher(self, user_name, kind, app_id,
                    app_display_name, device_display_name,
-                   pushkey, pushkey_ts, data):
+                   pushkey, pushkey_ts, lang, data):
         try:
             yield self._simple_upsert(
                 PushersTable.table_name,
@@ -108,6 +108,7 @@ class PusherStore(SQLBaseStore):
                     app_display_name=app_display_name,
                     device_display_name=device_display_name,
                     ts=pushkey_ts,
+                    lang=lang,
                     data=data
                 ))
         except Exception as e:
diff --git a/synapse/storage/schema/delta/v10.sql b/synapse/storage/schema/delta/v10.sql
index a991e4eb11..689d2dff8b 100644
--- a/synapse/storage/schema/delta/v10.sql
+++ b/synapse/storage/schema/delta/v10.sql
@@ -22,6 +22,7 @@ CREATE TABLE IF NOT EXISTS pushers (
   device_display_name varchar(128) NOT NULL,
   pushkey blob NOT NULL,
   ts BIGINT NOT NULL,
+  lang varchar(8),
   data blob,
   last_token TEXT,
   last_success BIGINT,
diff --git a/synapse/storage/schema/pusher.sql b/synapse/storage/schema/pusher.sql
index a991e4eb11..689d2dff8b 100644
--- a/synapse/storage/schema/pusher.sql
+++ b/synapse/storage/schema/pusher.sql
@@ -22,6 +22,7 @@ CREATE TABLE IF NOT EXISTS pushers (
   device_display_name varchar(128) NOT NULL,
   pushkey blob NOT NULL,
   ts BIGINT NOT NULL,
+  lang varchar(8),
   data blob,
   last_token TEXT,
   last_success BIGINT,
-- 
cgit 1.4.1


From afb714f7bebf88ac27eac018cffa2078e2723310 Mon Sep 17 00:00:00 2001
From: David Baker <dave@matrix.org>
Date: Tue, 20 Jan 2015 11:49:48 +0000
Subject: add instance_handles to pushers so we have a way to refer to them
 even if the push token changes.

---
 synapse/push/__init__.py             |  3 ++-
 synapse/push/httppusher.py           |  3 ++-
 synapse/push/pusherpool.py           |  9 ++++---
 synapse/rest/pusher.py               |  3 ++-
 synapse/storage/pusher.py            | 46 ++++++++++++++++++++----------------
 synapse/storage/schema/delta/v10.sql |  1 +
 synapse/storage/schema/pusher.sql    |  1 +
 7 files changed, 39 insertions(+), 27 deletions(-)

(limited to 'synapse/storage')

diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index 5f4e833add..3ee652f3bc 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -30,13 +30,14 @@ class Pusher(object):
     MAX_BACKOFF = 60 * 60 * 1000
     GIVE_UP_AFTER = 24 * 60 * 60 * 1000
 
-    def __init__(self, _hs, user_name, app_id,
+    def __init__(self, _hs, instance_handle, user_name, app_id,
                  app_display_name, device_display_name, pushkey, pushkey_ts,
                  data, last_token, last_success, failing_since):
         self.hs = _hs
         self.evStreamHandler = self.hs.get_handlers().event_stream_handler
         self.store = self.hs.get_datastore()
         self.clock = self.hs.get_clock()
+        self.instance_handle = instance_handle,
         self.user_name = user_name
         self.app_id = app_id
         self.app_display_name = app_display_name
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 7631a741fa..9a3e0be15e 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -24,11 +24,12 @@ logger = logging.getLogger(__name__)
 
 
 class HttpPusher(Pusher):
-    def __init__(self, _hs, user_name, app_id,
+    def __init__(self, _hs, instance_handle, user_name, app_id,
                  app_display_name, device_display_name, pushkey, pushkey_ts,
                  data, last_token, last_success, failing_since):
         super(HttpPusher, self).__init__(
             _hs,
+            instance_handle,
             user_name,
             app_id,
             app_display_name,
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 8c77f4b668..2dfecf178b 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -40,7 +40,7 @@ class PusherPool:
         self._start_pushers(pushers)
 
     @defer.inlineCallbacks
-    def add_pusher(self, user_name, kind, app_id,
+    def add_pusher(self, user_name, instance_handle, kind, app_id,
                    app_display_name, device_display_name, pushkey, lang, data):
         # we try to create the pusher just to validate the config: it
         # will then get pulled out of the database,
@@ -49,6 +49,7 @@ class PusherPool:
         self._create_pusher({
             "user_name": user_name,
             "kind": kind,
+            "instance_handle": instance_handle,
             "app_id": app_id,
             "app_display_name": app_display_name,
             "device_display_name": device_display_name,
@@ -61,17 +62,18 @@ class PusherPool:
             "failing_since": None
         })
         yield self._add_pusher_to_store(
-            user_name, kind, app_id,
+            user_name, instance_handle, kind, app_id,
             app_display_name, device_display_name,
             pushkey, lang, data
         )
 
     @defer.inlineCallbacks
-    def _add_pusher_to_store(self, user_name, kind, app_id,
+    def _add_pusher_to_store(self, user_name, instance_handle, kind, app_id,
                              app_display_name, device_display_name,
                              pushkey, lang, data):
         yield self.store.add_pusher(
             user_name=user_name,
+            instance_handle=instance_handle,
             kind=kind,
             app_id=app_id,
             app_display_name=app_display_name,
@@ -87,6 +89,7 @@ class PusherPool:
         if pusherdict['kind'] == 'http':
             return HttpPusher(
                 self.hs,
+                instance_handle=pusherdict['instance_handle'],
                 user_name=pusherdict['user_name'],
                 app_id=pusherdict['app_id'],
                 app_display_name=pusherdict['app_display_name'],
diff --git a/synapse/rest/pusher.py b/synapse/rest/pusher.py
index 6b9a59adb6..4659c9b1d9 100644
--- a/synapse/rest/pusher.py
+++ b/synapse/rest/pusher.py
@@ -31,7 +31,7 @@ class PusherRestServlet(RestServlet):
 
         content = _parse_json(request)
 
-        reqd = ['kind', 'app_id', 'app_display_name',
+        reqd = ['instance_handle', 'kind', 'app_id', 'app_display_name',
                 'device_display_name', 'pushkey', 'lang', 'data']
         missing = []
         for i in reqd:
@@ -45,6 +45,7 @@ class PusherRestServlet(RestServlet):
         try:
             yield pusher_pool.add_pusher(
                 user_name=user.to_string(),
+                instance_handle=content['instance_handle'],
                 kind=content['kind'],
                 app_id=content['app_id'],
                 app_display_name=content['app_display_name'],
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 4eb30c7bdf..113cdc8a8e 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -29,7 +29,7 @@ class PusherStore(SQLBaseStore):
     @defer.inlineCallbacks
     def get_pushers_by_app_id_and_pushkey(self, app_id_and_pushkey):
         sql = (
-            "SELECT id, user_name, kind, app_id,"
+            "SELECT id, user_name, kind, instance_handle, app_id,"
             "app_display_name, device_display_name, pushkey, ts, data, "
             "last_token, last_success, failing_since "
             "FROM pushers "
@@ -45,15 +45,16 @@ class PusherStore(SQLBaseStore):
                 "id": r[0],
                 "user_name": r[1],
                 "kind": r[2],
-                "app_id": r[3],
-                "app_display_name": r[4],
-                "device_display_name": r[5],
-                "pushkey": r[6],
-                "pushkey_ts": r[7],
-                "data": r[8],
-                "last_token": r[9],
-                "last_success": r[10],
-                "failing_since": r[11]
+                "instance_handle": r[3],
+                "app_id": r[4],
+                "app_display_name": r[5],
+                "device_display_name": r[6],
+                "pushkey": r[7],
+                "pushkey_ts": r[8],
+                "data": r[9],
+                "last_token": r[10],
+                "last_success": r[11],
+                "failing_since": r[12]
             }
             for r in rows
         ]
@@ -63,7 +64,7 @@ class PusherStore(SQLBaseStore):
     @defer.inlineCallbacks
     def get_all_pushers(self):
         sql = (
-            "SELECT id, user_name, kind, app_id,"
+            "SELECT id, user_name, kind, instance_handle, app_id,"
             "app_display_name, device_display_name, pushkey, ts, data, "
             "last_token, last_success, failing_since "
             "FROM pushers"
@@ -76,15 +77,16 @@ class PusherStore(SQLBaseStore):
                 "id": r[0],
                 "user_name": r[1],
                 "kind": r[2],
-                "app_id": r[3],
-                "app_display_name": r[4],
-                "device_display_name": r[5],
-                "pushkey": r[6],
-                "pushkey_ts": r[7],
-                "data": r[8],
-                "last_token": r[9],
-                "last_success": r[10],
-                "failing_since": r[11]
+                "instance_handle": r[3],
+                "app_id": r[4],
+                "app_display_name": r[5],
+                "device_display_name": r[6],
+                "pushkey": r[7],
+                "pushkey_ts": r[8],
+                "data": r[9],
+                "last_token": r[10],
+                "last_success": r[11],
+                "failing_since": r[12]
             }
             for r in rows
         ]
@@ -92,7 +94,7 @@ class PusherStore(SQLBaseStore):
         defer.returnValue(ret)
 
     @defer.inlineCallbacks
-    def add_pusher(self, user_name, kind, app_id,
+    def add_pusher(self, user_name, instance_handle, kind, app_id,
                    app_display_name, device_display_name,
                    pushkey, pushkey_ts, lang, data):
         try:
@@ -105,6 +107,7 @@ class PusherStore(SQLBaseStore):
                 dict(
                     user_name=user_name,
                     kind=kind,
+                    instance_handle=instance_handle,
                     app_display_name=app_display_name,
                     device_display_name=device_display_name,
                     ts=pushkey_ts,
@@ -155,6 +158,7 @@ class PushersTable(Table):
         "id",
         "user_name",
         "kind",
+        "instance_handle",
         "app_id",
         "app_display_name",
         "device_display_name",
diff --git a/synapse/storage/schema/delta/v10.sql b/synapse/storage/schema/delta/v10.sql
index 689d2dff8b..b84ce20ef3 100644
--- a/synapse/storage/schema/delta/v10.sql
+++ b/synapse/storage/schema/delta/v10.sql
@@ -16,6 +16,7 @@
 CREATE TABLE IF NOT EXISTS pushers (
   id INTEGER PRIMARY KEY AUTOINCREMENT,
   user_name TEXT NOT NULL,
+  instance_handle varchar(32) NOT NULL,
   kind varchar(8) NOT NULL,
   app_id varchar(64) NOT NULL,
   app_display_name varchar(64) NOT NULL,
diff --git a/synapse/storage/schema/pusher.sql b/synapse/storage/schema/pusher.sql
index 689d2dff8b..b84ce20ef3 100644
--- a/synapse/storage/schema/pusher.sql
+++ b/synapse/storage/schema/pusher.sql
@@ -16,6 +16,7 @@
 CREATE TABLE IF NOT EXISTS pushers (
   id INTEGER PRIMARY KEY AUTOINCREMENT,
   user_name TEXT NOT NULL,
+  instance_handle varchar(32) NOT NULL,
   kind varchar(8) NOT NULL,
   app_id varchar(64) NOT NULL,
   app_display_name varchar(64) NOT NULL,
-- 
cgit 1.4.1


From dc93860619d56e88844e91f38f66341a32e4c704 Mon Sep 17 00:00:00 2001
From: David Baker <dave@matrix.org>
Date: Thu, 22 Jan 2015 17:37:12 +0000
Subject: Add rest API & store for creating push rules

Also make unrecognised request error look more like synapse errors
because it makes it easier to throw them from within rest classes.
---
 synapse/rest/push_rule.py    | 195 ++++++++++++++++++++++++++++++++++++++++++
 synapse/storage/push_rule.py | 196 +++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 391 insertions(+)
 create mode 100644 synapse/rest/push_rule.py
 create mode 100644 synapse/storage/push_rule.py

(limited to 'synapse/storage')

diff --git a/synapse/rest/push_rule.py b/synapse/rest/push_rule.py
new file mode 100644
index 0000000000..b5e74479cf
--- /dev/null
+++ b/synapse/rest/push_rule.py
@@ -0,0 +1,195 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from synapse.api.errors import SynapseError, Codes, UnrecognizedRequestError
+from base import RestServlet, client_path_pattern
+from synapse.storage.push_rule import InconsistentRuleException, RuleNotFoundException
+
+import json
+
+
+class PushRuleRestServlet(RestServlet):
+    PATTERN = client_path_pattern("/pushrules/.*$")
+
+    def rule_spec_from_path(self, path):
+        if len(path) < 2:
+            raise UnrecognizedRequestError()
+        if path[0] != 'pushrules':
+            raise UnrecognizedRequestError()
+
+        scope = path[1]
+        path = path[2:]
+        if scope not in ['global', 'device']:
+            raise UnrecognizedRequestError()
+
+        device = None
+        if scope == 'device':
+            if len(path) == 0:
+                raise UnrecognizedRequestError()
+            device = path[0]
+            path = path[1:]
+
+        if len(path) == 0:
+            raise UnrecognizedRequestError()
+
+        template = path[0]
+        path = path[1:]
+
+        if len(path) == 0:
+            raise UnrecognizedRequestError()
+
+        rule_id = path[0]
+
+        spec = {
+            'scope' : scope,
+            'template': template,
+            'rule_id': rule_id
+        }
+        if device:
+            spec['device'] = device
+        return spec
+
+    def rule_tuple_from_request_object(self, rule_template, rule_id, req_obj):
+        if rule_template in ['override', 'underride']:
+            if 'conditions' not in req_obj:
+                raise InvalidRuleException("Missing 'conditions'")
+            conditions = req_obj['conditions']
+            for c in conditions:
+                if 'kind' not in c:
+                    raise InvalidRuleException("Condition without 'kind'")
+        elif rule_template == 'room':
+            conditions = [{
+                'kind': 'event_match',
+                'key': 'room_id',
+                'pattern': rule_id
+            }]
+        elif rule_template == 'sender':
+            conditions = [{
+                'kind': 'event_match',
+                'key': 'user_id',
+                'pattern': rule_id
+            }]
+        elif rule_template == 'content':
+            if 'pattern' not in req_obj:
+                raise InvalidRuleException("Content rule missing 'pattern'")
+            conditions = [{
+                'kind': 'event_match',
+                'key': 'content.body',
+                'pattern': req_obj['pattern']
+            }]
+        else:
+            raise InvalidRuleException("Unknown rule template: %s" % (rule_template))
+
+        if 'actions' not in req_obj:
+            raise InvalidRuleException("No actions found")
+        actions = req_obj['actions']
+
+        for a in actions:
+            if a in ['notify', 'dont-notify', 'coalesce']:
+                pass
+            elif isinstance(a, dict) and 'set_sound' in a:
+                pass
+            else:
+                raise InvalidRuleException("Unrecognised action")
+
+        return (conditions, actions)
+
+    def priority_class_from_spec(self, spec):
+        map = {
+            'underride': 0,
+            'sender': 1,
+            'room': 2,
+            'content': 3,
+            'override': 4
+        }
+
+        if spec['template'] not in map.keys():
+            raise InvalidRuleException("Unknown template: %s" % (spec['kind']))
+        pc = map[spec['template']]
+
+        if spec['scope'] == 'device':
+            pc += 5
+
+        return pc
+
+    @defer.inlineCallbacks
+    def on_PUT(self, request):
+        spec = self.rule_spec_from_path(request.postpath)
+        try:
+            priority_class = self.priority_class_from_spec(spec)
+        except InvalidRuleException as e:
+            raise SynapseError(400, e.message)
+
+        user = yield self.auth.get_user_by_req(request)
+
+        content = _parse_json(request)
+
+        try:
+            (conditions, actions) = self.rule_tuple_from_request_object(
+                spec['template'],
+                spec['rule_id'],
+                content
+            )
+        except InvalidRuleException as e:
+            raise SynapseError(400, e.message)
+
+        before = request.args.get("before", None)
+        if before and len(before):
+            before = before[0]
+        after = request.args.get("after", None)
+        if after and len(after):
+            after = after[0]
+
+        try:
+            yield self.hs.get_datastore().add_push_rule(
+                user_name=user.to_string(),
+                rule_id=spec['rule_id'],
+                priority_class=priority_class,
+                conditions=conditions,
+                actions=actions,
+                before=before,
+                after=after
+            )
+        except InconsistentRuleException as e:
+            raise SynapseError(400, e.message)
+        except RuleNotFoundException:
+            raise SynapseError(400, "before/after rule not found")
+
+        defer.returnValue((200, {}))
+
+    def on_OPTIONS(self, _):
+        return 200, {}
+
+
+class InvalidRuleException(Exception):
+    pass
+
+
+# XXX: C+ped from rest/room.py - surely this should be common?
+def _parse_json(request):
+    try:
+        content = json.loads(request.content.read())
+        if type(content) != dict:
+            raise SynapseError(400, "Content must be a JSON object.",
+                               errcode=Codes.NOT_JSON)
+        return content
+    except ValueError:
+        raise SynapseError(400, "Content not JSON.", errcode=Codes.NOT_JSON)
+
+
+def register_servlets(hs, http_server):
+    PushRuleRestServlet(hs).register(http_server)
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
new file mode 100644
index 0000000000..76c4557600
--- /dev/null
+++ b/synapse/storage/push_rule.py
@@ -0,0 +1,196 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import collections
+
+from ._base import SQLBaseStore, Table
+from twisted.internet import defer
+
+import logging
+import copy
+import json
+
+logger = logging.getLogger(__name__)
+
+
+class PushRuleStore(SQLBaseStore):
+    @defer.inlineCallbacks
+    def get_push_rules_for_user_name(self, user_name):
+        sql = (
+            "SELECT "+",".join(PushRuleTable.fields)+
+            "FROM pushers "
+            "WHERE user_name = ?"
+        )
+
+        rows = yield self._execute(None, sql, user_name)
+
+        dicts = []
+        for r in rows:
+            d = {}
+            for i, f in enumerate(PushRuleTable.fields):
+                d[f] = r[i]
+            dicts.append(d)
+
+        defer.returnValue(dicts)
+
+    @defer.inlineCallbacks
+    def add_push_rule(self, **kwargs):
+        vals = copy.copy(kwargs)
+        if 'conditions' in vals:
+            vals['conditions'] = json.dumps(vals['conditions'])
+        if 'actions' in vals:
+            vals['actions'] = json.dumps(vals['actions'])
+        # we could check the rest of the keys are valid column names
+        # but sqlite will do that anyway so I think it's just pointless.
+        if 'id' in vals:
+            del vals['id']
+
+        if 'after' in kwargs or 'before' in kwargs:
+            ret = yield self.runInteraction(
+                "_add_push_rule_relative_txn",
+                self._add_push_rule_relative_txn,
+                **vals
+            )
+            defer.returnValue(ret)
+        else:
+            ret = yield self.runInteraction(
+                "_add_push_rule_highest_priority_txn",
+                self._add_push_rule_highest_priority_txn,
+                **vals
+            )
+            defer.returnValue(ret)
+
+    def _add_push_rule_relative_txn(self, txn, user_name, **kwargs):
+        after = None
+        relative_to_rule = None
+        if 'after' in kwargs and kwargs['after']:
+            after = kwargs['after']
+            relative_to_rule = after
+        if 'before' in kwargs and kwargs['before']:
+            relative_to_rule = kwargs['before']
+
+        # get the priority of the rule we're inserting after/before
+        sql = (
+            "SELECT priority_class, priority FROM "+PushRuleTable.table_name+
+            " WHERE user_name = ? and rule_id = ?"
+        )
+        txn.execute(sql, (user_name, relative_to_rule))
+        res = txn.fetchall()
+        if not res:
+            raise RuleNotFoundException()
+        (priority_class, base_rule_priority) = res[0]
+
+        if 'priority_class' in kwargs and kwargs['priority_class'] != priority_class:
+            raise InconsistentRuleException(
+                "Given priority class does not match class of relative rule"
+            )
+
+        new_rule = copy.copy(kwargs)
+        if 'before' in new_rule:
+            del new_rule['before']
+        if 'after' in new_rule:
+            del new_rule['after']
+        new_rule['priority_class'] = priority_class
+        new_rule['user_name'] = user_name
+
+        # check if the priority before/after is free
+        new_rule_priority = base_rule_priority
+        if after:
+            new_rule_priority -= 1
+        else:
+            new_rule_priority += 1
+
+        new_rule['priority'] = new_rule_priority
+
+        sql = (
+            "SELECT COUNT(*) FROM "+PushRuleTable.table_name+
+            " WHERE user_name = ? AND priority_class = ? AND priority = ?"
+        )
+        txn.execute(sql, (user_name, priority_class, new_rule_priority))
+        res = txn.fetchall()
+        num_conflicting = res[0][0]
+
+        # if there are conflicting rules, bump everything
+        if num_conflicting:
+            sql = "UPDATE "+PushRuleTable.table_name+" SET priority = priority "
+            if after:
+                sql += "-1"
+            else:
+                sql += "+1"
+            sql += " WHERE user_name = ? AND priority_class = ? AND priority "
+            if after:
+                sql += "<= ?"
+            else:
+                sql += ">= ?"
+
+            txn.execute(sql, (user_name, priority_class, new_rule_priority))
+
+        # now insert the new rule
+        sql = "INSERT OR REPLACE INTO "+PushRuleTable.table_name+" ("
+        sql += ",".join(new_rule.keys())+") VALUES ("
+        sql += ", ".join(["?" for _ in new_rule.keys()])+")"
+
+        txn.execute(sql, new_rule.values())
+
+    def _add_push_rule_highest_priority_txn(self, txn, user_name, priority_class, **kwargs):
+        # find the highest priority rule in that class
+        sql = (
+            "SELECT COUNT(*), MAX(priority) FROM "+PushRuleTable.table_name+
+            " WHERE user_name = ? and priority_class = ?"
+        )
+        txn.execute(sql, (user_name, priority_class))
+        res = txn.fetchall()
+        (how_many, highest_prio) = res[0]
+
+        new_prio = 0
+        if how_many > 0:
+            new_prio = highest_prio + 1
+
+        # and insert the new rule
+        new_rule = copy.copy(kwargs)
+        if 'id' in new_rule:
+            del new_rule['id']
+        new_rule['user_name'] = user_name
+        new_rule['priority_class'] = priority_class
+        new_rule['priority'] = new_prio
+
+        sql = "INSERT OR REPLACE INTO "+PushRuleTable.table_name+" ("
+        sql += ",".join(new_rule.keys())+") VALUES ("
+        sql += ", ".join(["?" for _ in new_rule.keys()])+")"
+
+        txn.execute(sql, new_rule.values())
+
+class RuleNotFoundException(Exception):
+    pass
+
+
+class InconsistentRuleException(Exception):
+    pass
+
+
+class PushRuleTable(Table):
+    table_name = "push_rules"
+
+    fields = [
+        "id",
+        "user_name",
+        "rule_id",
+        "priority_class",
+        "priority",
+        "conditions",
+        "actions",
+    ]
+
+    EntryType = collections.namedtuple("PushRuleEntry", fields)
\ No newline at end of file
-- 
cgit 1.4.1


From ede491b4e0c14d44ce43dd5b152abf148b54b9ed Mon Sep 17 00:00:00 2001
From: David Baker <dave@matrix.org>
Date: Thu, 22 Jan 2015 17:38:53 +0000
Subject: Oops: second part of commit dc938606

---
 synapse/api/errors.py                | 12 ++++++++++++
 synapse/http/server.py               |  8 ++------
 synapse/rest/__init__.py             |  3 ++-
 synapse/storage/__init__.py          |  3 +++
 synapse/storage/schema/delta/v10.sql | 13 +++++++++++++
 synapse/storage/schema/pusher.sql    | 13 +++++++++++++
 6 files changed, 45 insertions(+), 7 deletions(-)

(limited to 'synapse/storage')

diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index a4155aebae..55181fe77e 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -21,6 +21,7 @@ logger = logging.getLogger(__name__)
 
 
 class Codes(object):
+    UNRECOGNIZED = "M_UNRECOGNIZED"
     UNAUTHORIZED = "M_UNAUTHORIZED"
     FORBIDDEN = "M_FORBIDDEN"
     BAD_JSON = "M_BAD_JSON"
@@ -82,6 +83,17 @@ class RegistrationError(SynapseError):
     pass
 
 
+class UnrecognizedRequestError(SynapseError):
+    """An error indicating we don't understand the request you're trying to make"""
+    def __init__(self, *args, **kwargs):
+        if "errcode" not in kwargs:
+            kwargs["errcode"] = Codes.NOT_FOUND
+        super(UnrecognizedRequestError, self).__init__(
+            400,
+            "Unrecognized request",
+            **kwargs
+        )
+
 class AuthError(SynapseError):
     """An error raised when there was a problem authorising an event."""
 
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 8015a22edf..0f6539e1be 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -16,7 +16,7 @@
 
 from synapse.http.agent_name import AGENT_NAME
 from synapse.api.errors import (
-    cs_exception, SynapseError, CodeMessageException
+    cs_exception, SynapseError, CodeMessageException, UnrecognizedRequestError
 )
 from synapse.util.logcontext import LoggingContext
 
@@ -139,11 +139,7 @@ class JsonResource(HttpServer, resource.Resource):
                     return
 
             # Huh. No one wanted to handle that? Fiiiiiine. Send 400.
-            self._send_response(
-                request,
-                400,
-                {"error": "Unrecognized request"}
-            )
+            raise UnrecognizedRequestError()
         except CodeMessageException as e:
             if isinstance(e, SynapseError):
                 logger.info("%s SynapseError: %s - %s", request, e.code, e.msg)
diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py
index 59521d0c77..8e5877cf3f 100644
--- a/synapse/rest/__init__.py
+++ b/synapse/rest/__init__.py
@@ -16,7 +16,7 @@
 
 from . import (
     room, events, register, login, profile, presence, initial_sync, directory,
-    voip, admin, pusher,
+    voip, admin, pusher, push_rule
 )
 
 
@@ -46,3 +46,4 @@ class RestServletFactory(object):
         voip.register_servlets(hs, client_resource)
         admin.register_servlets(hs, client_resource)
         pusher.register_servlets(hs, client_resource)
+        push_rule.register_servlets(hs, client_resource)
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 191fe462a5..11706676d0 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -30,6 +30,7 @@ from .transactions import TransactionStore
 from .keys import KeyStore
 from .event_federation import EventFederationStore
 from .pusher import PusherStore
+from .push_rule import PushRuleStore
 from .media_repository import MediaRepositoryStore
 
 from .state import StateStore
@@ -62,6 +63,7 @@ SCHEMAS = [
     "event_edges",
     "event_signatures",
     "pusher",
+    "push_rules",
     "media_repository",
 ]
 
@@ -85,6 +87,7 @@ class DataStore(RoomMemberStore, RoomStore,
                 EventFederationStore,
                 MediaRepositoryStore,
                 PusherStore,
+                PushRuleStore
                 ):
 
     def __init__(self, hs):
diff --git a/synapse/storage/schema/delta/v10.sql b/synapse/storage/schema/delta/v10.sql
index b84ce20ef3..8c4dfd5c1b 100644
--- a/synapse/storage/schema/delta/v10.sql
+++ b/synapse/storage/schema/delta/v10.sql
@@ -31,3 +31,16 @@ CREATE TABLE IF NOT EXISTS pushers (
   FOREIGN KEY(user_name) REFERENCES users(name),
   UNIQUE (app_id, pushkey)
 );
+
+CREATE TABLE IF NOT EXISTS push_rules (
+  id INTEGER PRIMARY KEY AUTOINCREMENT,
+  user_name TEXT NOT NULL,
+  rule_id TEXT NOT NULL,
+  priority_class TINYINT NOT NULL,
+  priority INTEGER NOT NULL DEFAULT 0,
+  conditions TEXT NOT NULL,
+  actions TEXT NOT NULL,
+  UNIQUE(user_name, rule_id)
+);
+
+CREATE INDEX IF NOT EXISTS push_rules_user_name on push_rules (user_name);
diff --git a/synapse/storage/schema/pusher.sql b/synapse/storage/schema/pusher.sql
index b84ce20ef3..8c4dfd5c1b 100644
--- a/synapse/storage/schema/pusher.sql
+++ b/synapse/storage/schema/pusher.sql
@@ -31,3 +31,16 @@ CREATE TABLE IF NOT EXISTS pushers (
   FOREIGN KEY(user_name) REFERENCES users(name),
   UNIQUE (app_id, pushkey)
 );
+
+CREATE TABLE IF NOT EXISTS push_rules (
+  id INTEGER PRIMARY KEY AUTOINCREMENT,
+  user_name TEXT NOT NULL,
+  rule_id TEXT NOT NULL,
+  priority_class TINYINT NOT NULL,
+  priority INTEGER NOT NULL DEFAULT 0,
+  conditions TEXT NOT NULL,
+  actions TEXT NOT NULL,
+  UNIQUE(user_name, rule_id)
+);
+
+CREATE INDEX IF NOT EXISTS push_rules_user_name on push_rules (user_name);
-- 
cgit 1.4.1


From 7ecb49ef25937558f1a19a8fe47879d4b9116316 Mon Sep 17 00:00:00 2001
From: David Baker <dave@matrix.org>
Date: Thu, 22 Jan 2015 17:53:30 +0000
Subject: Insufficient newlines

---
 synapse/storage/push_rule.py | 1 +
 1 file changed, 1 insertion(+)

(limited to 'synapse/storage')

diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index 76c4557600..dbbb35b2ab 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -172,6 +172,7 @@ class PushRuleStore(SQLBaseStore):
 
         txn.execute(sql, new_rule.values())
 
+
 class RuleNotFoundException(Exception):
     pass
 
-- 
cgit 1.4.1


From 673773b21701c91997512d568bcc8d49a5586b3a Mon Sep 17 00:00:00 2001
From: David Baker <dave@matrix.org>
Date: Thu, 22 Jan 2015 18:27:07 +0000
Subject: oops, this is not its own schema file

---
 synapse/storage/__init__.py | 1 -
 1 file changed, 1 deletion(-)

(limited to 'synapse/storage')

diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 11706676d0..8f56d90d95 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -63,7 +63,6 @@ SCHEMAS = [
     "event_edges",
     "event_signatures",
     "pusher",
-    "push_rules",
     "media_repository",
 ]
 
-- 
cgit 1.4.1


From 8a850573c9cf50dd83ba47c033b28fe2bbbaf9d4 Mon Sep 17 00:00:00 2001
From: David Baker <dave@matrix.org>
Date: Thu, 22 Jan 2015 19:32:17 +0000
Subject: As yet fairly untested GET API for push rules

---
 synapse/api/errors.py               |  14 +++-
 synapse/rest/client/v1/push_rule.py | 138 +++++++++++++++++++++++++++++++++---
 synapse/storage/push_rule.py        |   8 +--
 3 files changed, 145 insertions(+), 15 deletions(-)

(limited to 'synapse/storage')

diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index 55181fe77e..01207282d6 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -87,13 +87,25 @@ class UnrecognizedRequestError(SynapseError):
     """An error indicating we don't understand the request you're trying to make"""
     def __init__(self, *args, **kwargs):
         if "errcode" not in kwargs:
-            kwargs["errcode"] = Codes.NOT_FOUND
+            kwargs["errcode"] = Codes.UNRECOGNIZED
         super(UnrecognizedRequestError, self).__init__(
             400,
             "Unrecognized request",
             **kwargs
         )
 
+
+class NotFoundError(SynapseError):
+    """An error indicating we can't find the thing you asked for"""
+    def __init__(self, *args, **kwargs):
+        if "errcode" not in kwargs:
+            kwargs["errcode"] = Codes.NOT_FOUND
+        super(UnrecognizedRequestError, self).__init__(
+            404,
+            "Not found",
+            **kwargs
+        )
+
 class AuthError(SynapseError):
     """An error raised when there was a problem authorising an event."""
 
diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py
index b5e74479cf..2803c1f071 100644
--- a/synapse/rest/client/v1/push_rule.py
+++ b/synapse/rest/client/v1/push_rule.py
@@ -15,7 +15,7 @@
 
 from twisted.internet import defer
 
-from synapse.api.errors import SynapseError, Codes, UnrecognizedRequestError
+from synapse.api.errors import SynapseError, Codes, UnrecognizedRequestError, NotFoundError
 from base import RestServlet, client_path_pattern
 from synapse.storage.push_rule import InconsistentRuleException, RuleNotFoundException
 
@@ -24,6 +24,14 @@ import json
 
 class PushRuleRestServlet(RestServlet):
     PATTERN = client_path_pattern("/pushrules/.*$")
+    PRIORITY_CLASS_MAP = {
+        'underride': 0,
+        'sender': 1,
+        'room': 2,
+        'content': 3,
+        'override': 4
+    }
+    PRIORITY_CLASS_INVERSE_MAP = {v: k for k,v in PRIORITY_CLASS_MAP.items()}
 
     def rule_spec_from_path(self, path):
         if len(path) < 2:
@@ -109,15 +117,7 @@ class PushRuleRestServlet(RestServlet):
         return (conditions, actions)
 
     def priority_class_from_spec(self, spec):
-        map = {
-            'underride': 0,
-            'sender': 1,
-            'room': 2,
-            'content': 3,
-            'override': 4
-        }
-
-        if spec['template'] not in map.keys():
+        if spec['template'] not in PushRuleRestServlet.PRIORITY_CLASS_MAP.keys():
             raise InvalidRuleException("Unknown template: %s" % (spec['kind']))
         pc = map[spec['template']]
 
@@ -171,10 +171,128 @@ class PushRuleRestServlet(RestServlet):
 
         defer.returnValue((200, {}))
 
+    @defer.inlineCallbacks
+    def on_GET(self, request):
+        user = yield self.auth.get_user_by_req(request)
+
+        # we build up the full structure and then decide which bits of it
+        # to send which means doing unnecessary work sometimes but is
+        # is probably not going to make a whole lot of difference
+        rawrules = yield self.hs.get_datastore().get_push_rules_for_user_name(user.to_string())
+
+        rules = {'global': {}, 'device': {}}
+
+        rules['global'] = _add_empty_priority_class_arrays(rules['global'])
+
+        for r in rawrules:
+            rulearray = None
+
+            r["conditions"] = json.loads(r["conditions"])
+            r["actions"] = json.loads(r["actions"])
+
+            template_name = _priority_class_to_template_name(r['priority_class'])
+
+            if r['priority_class'] > PushRuleRestServlet.PRIORITY_CLASS_MAP['override']:
+                # per-device rule
+                instance_handle = _instance_handle_from_conditions(r["conditions"])
+                if not instance_handle:
+                    continue
+                if instance_handle not in rules['device']:
+                    rules['device'][instance_handle] = []
+                    rules['device'][instance_handle] = \
+                        _add_empty_priority_class_arrays(rules['device'][instance_handle])
+
+                rulearray = rules['device'][instance_handle]
+            else:
+                rulearray = rules['global'][template_name]
+
+            template_rule = _rule_to_template(r)
+            if template_rule:
+                rulearray.append(template_rule)
+
+        path = request.postpath[1:]
+        if path == []:
+            defer.returnValue((200, rules))
+
+        if path[0] == 'global':
+            path = path[1:]
+            result = _filter_ruleset_with_path(rules['global'], path)
+            defer.returnValue((200, result))
+        elif path[0] == 'device':
+            path = path[1:]
+            if path == []:
+                raise UnrecognizedRequestError
+            instance_handle = path[0]
+            if instance_handle not in rules['device']:
+                ret = {}
+                ret = _add_empty_priority_class_arrays(ret)
+                defer.returnValue((200, ret))
+            ruleset = rules['device'][instance_handle]
+            result = _filter_ruleset_with_path(ruleset, path)
+            defer.returnValue((200, result))
+        else:
+            raise UnrecognizedRequestError()
+
+
     def on_OPTIONS(self, _):
         return 200, {}
 
 
+def _add_empty_priority_class_arrays(d):
+    for pc in PushRuleRestServlet.PRIORITY_CLASS_MAP.keys():
+        d[pc] = []
+    return d
+
+def _instance_handle_from_conditions(conditions):
+    """
+    Given a list of conditions, return the instance handle of the
+    device rule if there is one
+    """
+    for c in conditions:
+        if c['kind'] == 'device':
+            return c['instance_handle']
+    return None
+
+def _filter_ruleset_with_path(ruleset, path):
+    if path == []:
+        return ruleset
+    template_kind = path[0]
+    if template_kind not in ruleset:
+        raise UnrecognizedRequestError()
+    path = path[1:]
+    if path == []:
+        return ruleset[template_kind]
+    rule_id = path[0]
+    for r in ruleset[template_kind]:
+        if r['rule_id'] == rule_id:
+            return r
+    raise NotFoundError
+
+def _priority_class_to_template_name(pc):
+    if pc > PushRuleRestServlet.PRIORITY_CLASS_MAP['override']:
+        # per-device
+        prio_class_index = pc - PushRuleRestServlet.PRIORITY_CLASS_MAP['override']
+        return PushRuleRestServlet.PRIORITY_CLASS_INVERSE_MAP[prio_class_index]
+    else:
+        return PushRuleRestServlet.PRIORITY_CLASS_INVERSE_MAP[pc]
+
+def _rule_to_template(rule):
+    template_name = _priority_class_to_template_name(rule['priority_class'])
+    if template_name in ['override', 'underride']:
+        return {k:rule[k] for k in ["rule_id", "conditions", "actions"]}
+    elif template_name in ["sender", "room"]:
+        return {k:rule[k] for k in ["rule_id", "actions"]}
+    elif template_name == 'content':
+        if len(rule["conditions"]) != 1:
+            return None
+        thecond = rule["conditions"][0]
+        if "pattern" not in thecond:
+            return None
+        ret = {k:rule[k] for k in ["rule_id", "actions"]}
+        ret["pattern"] = thecond["pattern"]
+        return ret
+
+
 class InvalidRuleException(Exception):
     pass
 
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index dbbb35b2ab..d087257ffc 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -29,11 +29,11 @@ class PushRuleStore(SQLBaseStore):
     @defer.inlineCallbacks
     def get_push_rules_for_user_name(self, user_name):
         sql = (
-            "SELECT "+",".join(PushRuleTable.fields)+
-            "FROM pushers "
-            "WHERE user_name = ?"
+            "SELECT "+",".join(PushRuleTable.fields)+" "
+            "FROM "+PushRuleTable.table_name+" "
+            "WHERE user_name = ? "
+            "ORDER BY priority_class DESC, priority DESC"
         )
-
         rows = yield self._execute(None, sql, user_name)
 
         dicts = []
-- 
cgit 1.4.1


From bcd48b9636071543fa64e7fb066275d1c9c1e363 Mon Sep 17 00:00:00 2001
From: David Baker <dave@matrix.org>
Date: Fri, 23 Jan 2015 10:28:25 +0000
Subject: Fix adding rules without before/after & add the rule that we couldn't
 find to the error

---
 synapse/rest/client/v1/push_rule.py | 4 ++--
 synapse/storage/push_rule.py        | 8 +++++---
 2 files changed, 7 insertions(+), 5 deletions(-)

(limited to 'synapse/storage')

diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py
index 7df3fc7f09..77a0772479 100644
--- a/synapse/rest/client/v1/push_rule.py
+++ b/synapse/rest/client/v1/push_rule.py
@@ -166,8 +166,8 @@ class PushRuleRestServlet(RestServlet):
             )
         except InconsistentRuleException as e:
             raise SynapseError(400, e.message)
-        except RuleNotFoundException:
-            raise SynapseError(400, "before/after rule not found")
+        except RuleNotFoundException as e:
+            raise SynapseError(400, e.message)
 
         defer.returnValue((200, {}))
 
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index d087257ffc..2366090e09 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -46,7 +46,7 @@ class PushRuleStore(SQLBaseStore):
         defer.returnValue(dicts)
 
     @defer.inlineCallbacks
-    def add_push_rule(self, **kwargs):
+    def add_push_rule(self, before, after, **kwargs):
         vals = copy.copy(kwargs)
         if 'conditions' in vals:
             vals['conditions'] = json.dumps(vals['conditions'])
@@ -57,10 +57,12 @@ class PushRuleStore(SQLBaseStore):
         if 'id' in vals:
             del vals['id']
 
-        if 'after' in kwargs or 'before' in kwargs:
+        if before or after:
             ret = yield self.runInteraction(
                 "_add_push_rule_relative_txn",
                 self._add_push_rule_relative_txn,
+                before=before,
+                after=after,
                 **vals
             )
             defer.returnValue(ret)
@@ -89,7 +91,7 @@ class PushRuleStore(SQLBaseStore):
         txn.execute(sql, (user_name, relative_to_rule))
         res = txn.fetchall()
         if not res:
-            raise RuleNotFoundException()
+            raise RuleNotFoundException("before/after rule not found: %s" % (relative_to_rule))
         (priority_class, base_rule_priority) = res[0]
 
         if 'priority_class' in kwargs and kwargs['priority_class'] != priority_class:
-- 
cgit 1.4.1


From 5f84ba8ea1991dff279f0135f474d9debfd1419a Mon Sep 17 00:00:00 2001
From: David Baker <dave@matrix.org>
Date: Fri, 23 Jan 2015 17:49:37 +0000
Subject: Add API to delete push rules.

---
 synapse/rest/client/v1/push_rule.py | 41 ++++++++++++++++++++++++++++++++++++-
 synapse/storage/push_rule.py        |  9 ++++++++
 2 files changed, 49 insertions(+), 1 deletion(-)

(limited to 'synapse/storage')

diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py
index 9dc2c0e11e..50bf5b9008 100644
--- a/synapse/rest/client/v1/push_rule.py
+++ b/synapse/rest/client/v1/push_rule.py
@@ -15,7 +15,8 @@
 
 from twisted.internet import defer
 
-from synapse.api.errors import SynapseError, Codes, UnrecognizedRequestError, NotFoundError
+from synapse.api.errors import SynapseError, Codes, UnrecognizedRequestError, NotFoundError, \
+    StoreError
 from base import RestServlet, client_path_pattern
 from synapse.storage.push_rule import InconsistentRuleException, RuleNotFoundException
 
@@ -175,6 +176,44 @@ class PushRuleRestServlet(RestServlet):
 
         defer.returnValue((200, {}))
 
+    @defer.inlineCallbacks
+    def on_DELETE(self, request):
+        spec = self.rule_spec_from_path(request.postpath)
+        try:
+            priority_class = _priority_class_from_spec(spec)
+        except InvalidRuleException as e:
+            raise SynapseError(400, e.message)
+
+        user = yield self.auth.get_user_by_req(request)
+
+        if 'device' in spec:
+            rules = yield self.hs.get_datastore().get_push_rules_for_user_name(
+                user.to_string()
+            )
+
+            for r in rules:
+                conditions = json.loads(r['conditions'])
+                ih = _instance_handle_from_conditions(conditions)
+                if ih == spec['device'] and r['priority_class'] == priority_class:
+                    yield self.hs.get_datastore().delete_push_rule(
+                        user.to_string(), spec['rule_id']
+                    )
+                    defer.returnValue((200, {}))
+            raise NotFoundError()
+        else:
+            try:
+                yield self.hs.get_datastore().delete_push_rule(
+                    user.to_string(), spec['rule_id'],
+                    priority_class=priority_class
+                )
+                defer.returnValue((200, {}))
+            except StoreError as e:
+                if e.code == 404:
+                    raise NotFoundError()
+                else:
+                    raise
+
+
     @defer.inlineCallbacks
     def on_GET(self, request):
         user = yield self.auth.get_user_by_req(request)
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index 2366090e09..ca04f2ccee 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -174,6 +174,15 @@ class PushRuleStore(SQLBaseStore):
 
         txn.execute(sql, new_rule.values())
 
+    @defer.inlineCallbacks
+    def delete_push_rule(self, user_name, rule_id):
+        yield self._simple_delete_one(
+            PushRuleTable.table_name,
+            {
+                'user_name': user_name,
+                'rule_id': rule_id
+            }
+        )
 
 class RuleNotFoundException(Exception):
     pass
-- 
cgit 1.4.1


From 69a75b7ebebb393c1ce84ff949f3480a6af0a782 Mon Sep 17 00:00:00 2001
From: David Baker <dave@matrix.org>
Date: Mon, 26 Jan 2015 16:52:47 +0000
Subject: Add brackets to make get room name / alias work

---
 synapse/storage/__init__.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

(limited to 'synapse/storage')

diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 8f56d90d95..2534d109fd 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -402,8 +402,8 @@ class DataStore(RoomMemberStore, RoomStore,
             "redacted": del_sql,
         }
 
-        sql += " AND (s.type = 'm.room.name' AND s.state_key = '')"
-        sql += " OR s.type = 'm.room.aliases'"
+        sql += " AND ((s.type = 'm.room.name' AND s.state_key = '')"
+        sql += " OR s.type = 'm.room.aliases')"
         args = (room_id,)
 
         results = yield self._execute_and_decode(sql, *args)
-- 
cgit 1.4.1


From 0cbb6b0f5235e4501a0fb360e881d152644a17cd Mon Sep 17 00:00:00 2001
From: David Baker <dave@matrix.org>
Date: Wed, 28 Jan 2015 14:44:41 +0000
Subject: Google doc style

---
 synapse/storage/_base.py | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)

(limited to 'synapse/storage')

diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 4f172d3967..809c81f47f 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -195,10 +195,11 @@ class SQLBaseStore(object):
 
     def _simple_upsert(self, table, keyvalues, values):
         """
-        :param table: The table to upsert into
-        :param keyvalues: Dict of the unique key tables and their new values
-        :param values: Dict of all the nonunique columns and their new values
-        :return: A deferred
+        Args:
+            table (str): The table to upsert into
+            keyvalues (dict): The unique key tables and their new values
+            values (dict): The nonunique columns and their new values
+        Returns: A deferred
         """
         return self.runInteraction(
             "_simple_upsert",
-- 
cgit 1.4.1


From fb0928097a0dc1606aebb9aed8f070bcea304178 Mon Sep 17 00:00:00 2001
From: David Baker <dave@matrix.org>
Date: Wed, 28 Jan 2015 14:48:07 +0000
Subject: More magic commas (including the place I copied it from...)

---
 synapse/storage/_base.py | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

(limited to 'synapse/storage')

diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 809c81f47f..9261c999cb 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -210,8 +210,8 @@ class SQLBaseStore(object):
         # Try to update
         sql = "UPDATE %s SET %s WHERE %s" % (
             table,
-            ", ".join("%s = ?" % (k) for k in values),
-            " AND ".join("%s = ?" % (k) for k in keyvalues)
+            ", ".join("%s = ?" % (k,) for k in values),
+            " AND ".join("%s = ?" % (k,) for k in keyvalues)
         )
         sqlargs = values.values() + keyvalues.values()
         logger.debug(
@@ -390,8 +390,8 @@ class SQLBaseStore(object):
         if updatevalues:
             update_sql = "UPDATE %s SET %s WHERE %s" % (
                 table,
-                ", ".join("%s = ?" % (k) for k in updatevalues),
-                " AND ".join("%s = ?" % (k) for k in keyvalues)
+                ", ".join("%s = ?" % (k,) for k in updatevalues),
+                " AND ".join("%s = ?" % (k,) for k in keyvalues)
             )
 
         def func(txn):
-- 
cgit 1.4.1


From 6d485dd1c727e7ecfe3991066bd058794ae05051 Mon Sep 17 00:00:00 2001
From: David Baker <dave@matrix.org>
Date: Wed, 28 Jan 2015 14:48:42 +0000
Subject: unnecessary newlines

---
 synapse/storage/_base.py | 2 --
 1 file changed, 2 deletions(-)

(limited to 'synapse/storage')

diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 9261c999cb..4e8bd3faa9 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -237,8 +237,6 @@ class SQLBaseStore(object):
             )
             txn.execute(sql, allvalues.values())
 
-
-
     def _simple_select_one(self, table, keyvalues, retcols,
                            allow_none=False):
         """Executes a SELECT query on the named table, which is expected to
-- 
cgit 1.4.1


From 445ad9941ea2e4038846aa6fed456e3250ae49b1 Mon Sep 17 00:00:00 2001
From: David Baker <dave@matrix.org>
Date: Wed, 28 Jan 2015 14:49:59 +0000
Subject: Redundant parens

---
 synapse/storage/push_rule.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

(limited to 'synapse/storage')

diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index ca04f2ccee..f5a736be44 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -92,7 +92,7 @@ class PushRuleStore(SQLBaseStore):
         res = txn.fetchall()
         if not res:
             raise RuleNotFoundException("before/after rule not found: %s" % (relative_to_rule))
-        (priority_class, base_rule_priority) = res[0]
+        priority_class, base_rule_priority = res[0]
 
         if 'priority_class' in kwargs and kwargs['priority_class'] != priority_class:
             raise InconsistentRuleException(
-- 
cgit 1.4.1


From 93aac9bb7b3023e6c82961b1cdd655a48ec567fb Mon Sep 17 00:00:00 2001
From: David Baker <dave@matrix.org>
Date: Wed, 28 Jan 2015 14:51:01 +0000
Subject: Newline

---
 synapse/storage/push_rule.py | 1 +
 1 file changed, 1 insertion(+)

(limited to 'synapse/storage')

diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index f5a736be44..48105234f6 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -184,6 +184,7 @@ class PushRuleStore(SQLBaseStore):
             }
         )
 
+
 class RuleNotFoundException(Exception):
     pass
 
-- 
cgit 1.4.1


From e78dd332928c111c8a62985bce0a3c1c5631244e Mon Sep 17 00:00:00 2001
From: David Baker <dave@matrix.org>
Date: Wed, 28 Jan 2015 14:52:58 +0000
Subject: Use %s instead of +

---
 synapse/storage/push_rule.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

(limited to 'synapse/storage')

diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index 48105234f6..0342996ed1 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -85,8 +85,8 @@ class PushRuleStore(SQLBaseStore):
 
         # get the priority of the rule we're inserting after/before
         sql = (
-            "SELECT priority_class, priority FROM "+PushRuleTable.table_name+
-            " WHERE user_name = ? and rule_id = ?"
+            "SELECT priority_class, priority FROM ? "
+            "WHERE user_name = ? and rule_id = ?" % (PushRuleTable.table_name,)
         )
         txn.execute(sql, (user_name, relative_to_rule))
         res = txn.fetchall()
-- 
cgit 1.4.1