diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 4881f03368..eb8cc4a9f3 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -195,6 +195,51 @@ class SQLBaseStore(object):
txn.execute(sql, values.values())
return txn.lastrowid
+ def _simple_upsert(self, table, keyvalues, values):
+ """
+ :param table: The table to upsert into
+ :param keyvalues: Dict of the unique key tables and their new values
+ :param values: Dict of all the nonunique columns and their new values
+ :return: A deferred
+ """
+ return self.runInteraction(
+ "_simple_upsert",
+ self._simple_upsert_txn, table, keyvalues, values
+ )
+
+ def _simple_upsert_txn(self, txn, table, keyvalues, values):
+ # Try to update
+ sql = "UPDATE %s SET %s WHERE %s" % (
+ table,
+ ", ".join("%s = ?" % (k) for k in values),
+ " AND ".join("%s = ?" % (k) for k in keyvalues)
+ )
+ sqlargs = values.values() + keyvalues.values()
+ logger.debug(
+ "[SQL] %s Args=%s",
+ sql, sqlargs,
+ )
+
+ txn.execute(sql, sqlargs)
+ if txn.rowcount == 0:
+ # We didn't update and rows so insert a new one
+ allvalues = {}
+ allvalues.update(keyvalues)
+ allvalues.update(values)
+
+ sql = "INSERT INTO %s (%s) VALUES (%s)" % (
+ table,
+ ", ".join(k for k in allvalues),
+ ", ".join("?" for _ in allvalues)
+ )
+ logger.debug(
+ "[SQL] %s Args=%s",
+ sql, keyvalues.values(),
+ )
+ txn.execute(sql, allvalues.values())
+
+
+
def _simple_select_one(self, table, keyvalues, retcols,
allow_none=False):
"""Executes a SELECT query on the named table, which is expected to
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index a858e46f3b..deabd9cd2e 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -28,16 +28,48 @@ logger = logging.getLogger(__name__)
class PusherStore(SQLBaseStore):
@defer.inlineCallbacks
- def get_all_pushers_after_id(self, min_id):
+ def get_pushers_by_app_id_and_pushkey(self, app_id_and_pushkey):
sql = (
- "SELECT id, user_name, kind, app_id, app_instance_id,"
+ "SELECT id, user_name, kind, app_id,"
"app_display_name, device_display_name, pushkey, data, "
"last_token, last_success, failing_since "
"FROM pushers "
- "WHERE id > ?"
+ "WHERE app_id = ? AND pushkey = ?"
)
- rows = yield self._execute(None, sql, min_id)
+ rows = yield self._execute(
+ None, sql, app_id_and_pushkey[0], app_id_and_pushkey[1]
+ )
+
+ ret = [
+ {
+ "id": r[0],
+ "user_name": r[1],
+ "kind": r[2],
+ "app_id": r[3],
+ "app_display_name": r[4],
+ "device_display_name": r[5],
+ "pushkey": r[6],
+ "data": r[7],
+ "last_token": r[8],
+ "last_success": r[9],
+ "failing_since": r[10]
+ }
+ for r in rows
+ ]
+
+ defer.returnValue(ret[0])
+
+ @defer.inlineCallbacks
+ def get_all_pushers(self):
+ sql = (
+ "SELECT id, user_name, kind, app_id,"
+ "app_display_name, device_display_name, pushkey, data, "
+ "last_token, last_success, failing_since "
+ "FROM pushers"
+ )
+
+ rows = yield self._execute(None, sql)
ret = [
{
@@ -45,14 +77,13 @@ class PusherStore(SQLBaseStore):
"user_name": r[1],
"kind": r[2],
"app_id": r[3],
- "app_instance_id": r[4],
- "app_display_name": r[5],
- "device_display_name": r[6],
- "pushkey": r[7],
- "data": r[8],
- "last_token": r[9],
- "last_success": r[10],
- "failing_since": r[11]
+ "app_display_name": r[4],
+ "device_display_name": r[5],
+ "pushkey": r[6],
+ "data": r[7],
+ "last_token": r[8],
+ "last_success": r[9],
+ "failing_since": r[10]
}
for r in rows
]
@@ -60,21 +91,22 @@ class PusherStore(SQLBaseStore):
defer.returnValue(ret)
@defer.inlineCallbacks
- def add_pusher(self, user_name, kind, app_id, app_instance_id,
+ def add_pusher(self, user_name, kind, app_id,
app_display_name, device_display_name, pushkey, data):
try:
- yield self._simple_insert(PushersTable.table_name, dict(
- user_name=user_name,
- kind=kind,
- app_id=app_id,
- app_instance_id=app_instance_id,
- app_display_name=app_display_name,
- device_display_name=device_display_name,
- pushkey=pushkey,
- data=data
- ))
- except IntegrityError:
- raise StoreError(409, "Pushkey in use.")
+ yield self._simple_upsert(
+ PushersTable.table_name,
+ dict(
+ app_id=app_id,
+ pushkey=pushkey,
+ ),
+ dict(
+ user_name=user_name,
+ kind=kind,
+ app_display_name=app_display_name,
+ device_display_name=device_display_name,
+ data=data
+ ))
except Exception as e:
logger.error("create_pusher with failed: %s", e)
raise StoreError(500, "Problem creating pusher.")
@@ -113,7 +145,6 @@ class PushersTable(Table):
"user_name",
"kind",
"app_id",
- "app_instance_id",
"app_display_name",
"device_display_name",
"pushkey",
diff --git a/synapse/storage/schema/delta/v7.sql b/synapse/storage/schema/delta/v7.sql
index b60aeda756..799e48d780 100644
--- a/synapse/storage/schema/delta/v7.sql
+++ b/synapse/storage/schema/delta/v7.sql
@@ -18,7 +18,6 @@ CREATE TABLE IF NOT EXISTS pushers (
user_name TEXT NOT NULL,
kind varchar(8) NOT NULL,
app_id varchar(64) NOT NULL,
- app_instance_id varchar(64) NOT NULL,
app_display_name varchar(64) NOT NULL,
device_display_name varchar(128) NOT NULL,
pushkey blob NOT NULL,
@@ -27,5 +26,5 @@ CREATE TABLE IF NOT EXISTS pushers (
last_success BIGINT,
failing_since BIGINT,
FOREIGN KEY(user_name) REFERENCES users(name),
- UNIQUE (user_name, pushkey)
+ UNIQUE (app_id, pushkey)
);
diff --git a/synapse/storage/schema/pusher.sql b/synapse/storage/schema/pusher.sql
index b60aeda756..799e48d780 100644
--- a/synapse/storage/schema/pusher.sql
+++ b/synapse/storage/schema/pusher.sql
@@ -18,7 +18,6 @@ CREATE TABLE IF NOT EXISTS pushers (
user_name TEXT NOT NULL,
kind varchar(8) NOT NULL,
app_id varchar(64) NOT NULL,
- app_instance_id varchar(64) NOT NULL,
app_display_name varchar(64) NOT NULL,
device_display_name varchar(128) NOT NULL,
pushkey blob NOT NULL,
@@ -27,5 +26,5 @@ CREATE TABLE IF NOT EXISTS pushers (
last_success BIGINT,
failing_since BIGINT,
FOREIGN KEY(user_name) REFERENCES users(name),
- UNIQUE (user_name, pushkey)
+ UNIQUE (app_id, pushkey)
);
|