diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 60c2d67425..ac3bf5cee5 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -29,6 +29,7 @@ from .stream import StreamStore
from .transactions import TransactionStore
from .keys import KeyStore
from .event_federation import EventFederationStore
+from .pusher import PusherStore
from .media_repository import MediaRepositoryStore
from .state import StateStore
@@ -60,6 +61,7 @@ SCHEMAS = [
"state",
"event_edges",
"event_signatures",
+ "pusher",
"media_repository",
]
@@ -82,6 +84,7 @@ class DataStore(RoomMemberStore, RoomStore,
DirectoryStore, KeyStore, StateStore, SignatureStore,
EventFederationStore,
MediaRepositoryStore,
+ PusherStore,
):
def __init__(self, hs):
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 6dc857c4aa..efb2664680 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -193,6 +193,51 @@ class SQLBaseStore(object):
txn.execute(sql, values.values())
return txn.lastrowid
+ def _simple_upsert(self, table, keyvalues, values):
+ """
+ :param table: The table to upsert into
+ :param keyvalues: Dict of the unique key tables and their new values
+ :param values: Dict of all the nonunique columns and their new values
+ :return: A deferred
+ """
+ return self.runInteraction(
+ "_simple_upsert",
+ self._simple_upsert_txn, table, keyvalues, values
+ )
+
+ def _simple_upsert_txn(self, txn, table, keyvalues, values):
+ # Try to update
+ sql = "UPDATE %s SET %s WHERE %s" % (
+ table,
+ ", ".join("%s = ?" % (k) for k in values),
+ " AND ".join("%s = ?" % (k) for k in keyvalues)
+ )
+ sqlargs = values.values() + keyvalues.values()
+ logger.debug(
+ "[SQL] %s Args=%s",
+ sql, sqlargs,
+ )
+
+ txn.execute(sql, sqlargs)
+ if txn.rowcount == 0:
+ # We didn't update and rows so insert a new one
+ allvalues = {}
+ allvalues.update(keyvalues)
+ allvalues.update(values)
+
+ sql = "INSERT INTO %s (%s) VALUES (%s)" % (
+ table,
+ ", ".join(k for k in allvalues),
+ ", ".join("?" for _ in allvalues)
+ )
+ logger.debug(
+ "[SQL] %s Args=%s",
+ sql, keyvalues.values(),
+ )
+ txn.execute(sql, allvalues.values())
+
+
+
def _simple_select_one(self, table, keyvalues, retcols,
allow_none=False):
"""Executes a SELECT query on the named table, which is expected to
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
new file mode 100644
index 0000000000..9b5170a5f7
--- /dev/null
+++ b/synapse/storage/pusher.py
@@ -0,0 +1,156 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import collections
+
+from ._base import SQLBaseStore, Table
+from twisted.internet import defer
+
+from synapse.api.errors import StoreError
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class PusherStore(SQLBaseStore):
+ @defer.inlineCallbacks
+ def get_pushers_by_app_id_and_pushkey(self, app_id_and_pushkey):
+ sql = (
+ "SELECT id, user_name, kind, app_id,"
+ "app_display_name, device_display_name, pushkey, data, "
+ "last_token, last_success, failing_since "
+ "FROM pushers "
+ "WHERE app_id = ? AND pushkey = ?"
+ )
+
+ rows = yield self._execute(
+ None, sql, app_id_and_pushkey[0], app_id_and_pushkey[1]
+ )
+
+ ret = [
+ {
+ "id": r[0],
+ "user_name": r[1],
+ "kind": r[2],
+ "app_id": r[3],
+ "app_display_name": r[4],
+ "device_display_name": r[5],
+ "pushkey": r[6],
+ "data": r[7],
+ "last_token": r[8],
+ "last_success": r[9],
+ "failing_since": r[10]
+ }
+ for r in rows
+ ]
+
+ defer.returnValue(ret[0])
+
+ @defer.inlineCallbacks
+ def get_all_pushers(self):
+ sql = (
+ "SELECT id, user_name, kind, app_id,"
+ "app_display_name, device_display_name, pushkey, data, "
+ "last_token, last_success, failing_since "
+ "FROM pushers"
+ )
+
+ rows = yield self._execute(None, sql)
+
+ ret = [
+ {
+ "id": r[0],
+ "user_name": r[1],
+ "kind": r[2],
+ "app_id": r[3],
+ "app_display_name": r[4],
+ "device_display_name": r[5],
+ "pushkey": r[6],
+ "data": r[7],
+ "last_token": r[8],
+ "last_success": r[9],
+ "failing_since": r[10]
+ }
+ for r in rows
+ ]
+
+ defer.returnValue(ret)
+
+ @defer.inlineCallbacks
+ def add_pusher(self, user_name, kind, app_id,
+ app_display_name, device_display_name, pushkey, data):
+ try:
+ yield self._simple_upsert(
+ PushersTable.table_name,
+ dict(
+ app_id=app_id,
+ pushkey=pushkey,
+ ),
+ dict(
+ user_name=user_name,
+ kind=kind,
+ app_display_name=app_display_name,
+ device_display_name=device_display_name,
+ data=data
+ ))
+ except Exception as e:
+ logger.error("create_pusher with failed: %s", e)
+ raise StoreError(500, "Problem creating pusher.")
+
+ @defer.inlineCallbacks
+ def update_pusher_last_token(self, user_name, pushkey, last_token):
+ yield self._simple_update_one(
+ PushersTable.table_name,
+ {'user_name': user_name, 'pushkey': pushkey},
+ {'last_token': last_token}
+ )
+
+ @defer.inlineCallbacks
+ def update_pusher_last_token_and_success(self, user_name, pushkey,
+ last_token, last_success):
+ yield self._simple_update_one(
+ PushersTable.table_name,
+ {'user_name': user_name, 'pushkey': pushkey},
+ {'last_token': last_token, 'last_success': last_success}
+ )
+
+ @defer.inlineCallbacks
+ def update_pusher_failing_since(self, user_name, pushkey, failing_since):
+ yield self._simple_update_one(
+ PushersTable.table_name,
+ {'user_name': user_name, 'pushkey': pushkey},
+ {'failing_since': failing_since}
+ )
+
+
+class PushersTable(Table):
+ table_name = "pushers"
+
+ fields = [
+ "id",
+ "user_name",
+ "kind",
+ "app_id",
+ "app_display_name",
+ "device_display_name",
+ "pushkey",
+ "data",
+ "last_token",
+ "last_success",
+ "failing_since"
+ ]
+
+ EntryType = collections.namedtuple("PusherEntry", fields)
\ No newline at end of file
diff --git a/synapse/storage/schema/delta/v10.sql b/synapse/storage/schema/delta/v10.sql
new file mode 100644
index 0000000000..799e48d780
--- /dev/null
+++ b/synapse/storage/schema/delta/v10.sql
@@ -0,0 +1,30 @@
+/* Copyright 2014 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-- Push notification endpoints that users have configured
+CREATE TABLE IF NOT EXISTS pushers (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ user_name TEXT NOT NULL,
+ kind varchar(8) NOT NULL,
+ app_id varchar(64) NOT NULL,
+ app_display_name varchar(64) NOT NULL,
+ device_display_name varchar(128) NOT NULL,
+ pushkey blob NOT NULL,
+ data blob,
+ last_token TEXT,
+ last_success BIGINT,
+ failing_since BIGINT,
+ FOREIGN KEY(user_name) REFERENCES users(name),
+ UNIQUE (app_id, pushkey)
+);
diff --git a/synapse/storage/schema/pusher.sql b/synapse/storage/schema/pusher.sql
new file mode 100644
index 0000000000..799e48d780
--- /dev/null
+++ b/synapse/storage/schema/pusher.sql
@@ -0,0 +1,30 @@
+/* Copyright 2014 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-- Push notification endpoints that users have configured
+CREATE TABLE IF NOT EXISTS pushers (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ user_name TEXT NOT NULL,
+ kind varchar(8) NOT NULL,
+ app_id varchar(64) NOT NULL,
+ app_display_name varchar(64) NOT NULL,
+ device_display_name varchar(128) NOT NULL,
+ pushkey blob NOT NULL,
+ data blob,
+ last_token TEXT,
+ last_success BIGINT,
+ failing_since BIGINT,
+ FOREIGN KEY(user_name) REFERENCES users(name),
+ UNIQUE (app_id, pushkey)
+);
|