diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index 63d1af4e86..39b7881c40 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -355,11 +355,11 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
# being sent)
last_txn_id = self._get_last_txn(txn, service.id)
- result = txn.execute(
+ txn.execute(
"SELECT MAX(txn_id) FROM application_services_txns WHERE as_id=?",
(service.id,)
)
- highest_txn_id = result.fetchone()[0]
+ highest_txn_id = txn.fetchone()[0]
if highest_txn_id is None:
highest_txn_id = 0
@@ -441,15 +441,17 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
def _get_oldest_unsent_txn(self, txn, service):
# Monotonically increasing txn ids, so just select the smallest
# one in the txns table (we delete them when they are sent)
- result = txn.execute(
- "SELECT MIN(txn_id), * FROM application_services_txns WHERE as_id=?",
+ txn.execute(
+ "SELECT * FROM application_services_txns WHERE as_id=?"
+ " ORDER BY txn_id ASC LIMIT 1",
(service.id,)
)
- entry = self.cursor_to_dict(result)[0]
- if not entry or entry["txn_id"] is None:
- # the min(txn_id) part will force a row, so entry may not be None
+ rows = self.cursor_to_dict(txn)
+ if not rows:
return None
+ entry = rows[0]
+
event_ids = json.loads(entry["event_ids"])
events = self._get_events_txn(txn, event_ids)
@@ -458,11 +460,11 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
)
def _get_last_txn(self, txn, service_id):
- result = txn.execute(
+ txn.execute(
"SELECT last_txn FROM application_services_state WHERE as_id=?",
(service_id,)
)
- last_txn_id = result.fetchone()
+ last_txn_id = txn.fetchone()
if last_txn_id is None or last_txn_id[0] is None: # no row exists
return 0
else:
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index cbe9339ccf..5bdf497b93 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -137,8 +137,13 @@ class KeyStore(SQLBaseStore):
ts_valid_until_ms (int): The time when this json stops being valid.
key_json (bytes): The encoded JSON.
"""
- return self._simple_insert(
+ return self._simple_upsert(
table="server_keys_json",
+ keyvalues={
+ "server_name": server_name,
+ "key_id": key_id,
+ "from_server": from_server,
+ },
values={
"server_name": server_name,
"key_id": key_id,
@@ -147,7 +152,6 @@ class KeyStore(SQLBaseStore):
"ts_valid_until_ms": ts_expires_ms,
"key_json": buffer(key_json_bytes),
},
- or_replace=True,
)
def get_server_keys_json(self, server_keys):
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 2582a1da66..08ea62681b 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -21,34 +21,62 @@ from synapse.api.errors import StoreError
from syutil.jsonutil import encode_canonical_json
import logging
+import simplejson as json
+import types
logger = logging.getLogger(__name__)
class PusherStore(SQLBaseStore):
+ def _decode_pushers_rows(self, rows):
+ for r in rows:
+ dataJson = r['data']
+ r['data'] = None
+ try:
+ if isinstance(dataJson, types.BufferType):
+ dataJson = str(dataJson).decode("UTF8")
+
+ r['data'] = json.loads(dataJson)
+ except Exception as e:
+ logger.warn(
+ "Invalid JSON in data for pusher %d: %s, %s",
+ r['id'], dataJson, e.message,
+ )
+ pass
+
+ if isinstance(r['pushkey'], types.BufferType):
+ r['pushkey'] = str(r['pushkey']).decode("UTF8")
+
+ return rows
+
@defer.inlineCallbacks
def get_pushers_by_app_id_and_pushkey(self, app_id, pushkey):
- sql = (
- "SELECT * FROM pushers "
- "WHERE app_id = ? AND pushkey = ?"
- )
+ def r(txn):
+ sql = (
+ "SELECT * FROM pushers"
+ " WHERE app_id = ? AND pushkey = ?"
+ )
- rows = yield self._execute_and_decode(
- "get_pushers_by_app_id_and_pushkey",
- sql,
- app_id, pushkey
+ txn.execute(sql, (app_id, pushkey,))
+ rows = self.cursor_to_dict(txn)
+
+ return self._decode_pushers_rows(rows)
+
+ rows = yield self.runInteraction(
+ "get_pushers_by_app_id_and_pushkey", r
)
defer.returnValue(rows)
@defer.inlineCallbacks
def get_all_pushers(self):
- sql = (
- "SELECT * FROM pushers"
- )
+ def get_pushers(txn):
+ txn.execute("SELECT * FROM pushers")
+ rows = self.cursor_to_dict(txn)
- rows = yield self._execute_and_decode("get_all_pushers", sql)
+ return self._decode_pushers_rows(rows)
+ rows = yield self.runInteraction("get_all_pushers", get_pushers)
defer.returnValue(rows)
@defer.inlineCallbacks
@@ -72,7 +100,7 @@ class PusherStore(SQLBaseStore):
device_display_name=device_display_name,
ts=pushkey_ts,
lang=lang,
- data=encode_canonical_json(data).decode("UTF-8"),
+ data=encode_canonical_json(data),
),
insertion_values=dict(
id=next_id,
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index a986c4816e..026ba217d6 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -181,7 +181,7 @@ class RegistrationStore(SQLBaseStore):
@defer.inlineCallbacks
def user_add_threepid(self, user_id, medium, address, validated_at, added_at):
yield self._simple_upsert("user_threepids", {
- "user": user_id,
+ "user_id": user_id,
"medium": medium,
"address": address,
}, {
@@ -193,7 +193,7 @@ class RegistrationStore(SQLBaseStore):
def user_get_threepids(self, user_id):
ret = yield self._simple_select_list(
"user_threepids", {
- "user": user_id
+ "user_id": user_id
},
['medium', 'address', 'validated_at', 'added_at'],
'user_get_threepids'
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 48ebb33057..78572bbdd2 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -75,6 +75,16 @@ class RoomStore(SQLBaseStore):
allow_none=True,
)
+ def get_public_room_ids(self):
+ return self._simple_select_onecol(
+ table="rooms",
+ keyvalues={
+ "is_public": True,
+ },
+ retcol="room_id",
+ desc="get_public_room_ids",
+ )
+
@defer.inlineCallbacks
def get_rooms(self, is_public):
"""Retrieve a list of all public rooms.
@@ -186,14 +196,13 @@ class RoomStore(SQLBaseStore):
sql = (
"SELECT e.*, (%(redacted)s) AS redacted FROM events as e "
"INNER JOIN current_state_events as c ON e.event_id = c.event_id "
- "INNER JOIN state_events as s ON e.event_id = s.event_id "
"WHERE c.room_id = ? "
) % {
"redacted": del_sql,
}
- sql += " AND ((s.type = 'm.room.name' AND s.state_key = '')"
- sql += " OR s.type = 'm.room.aliases')"
+ sql += " AND ((c.type = 'm.room.name' AND c.state_key = '')"
+ sql += " OR c.type = 'm.room.aliases')"
args = (room_id,)
results = yield self._execute_and_decode("get_current_state", sql, *args)
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 8ea5756d61..831169e220 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -65,6 +65,7 @@ class RoomMemberStore(SQLBaseStore):
)
self.get_rooms_for_user.invalidate(target_user_id)
+ self.get_joined_hosts_for_room.invalidate(event.room_id)
def get_room_member(self, user_id, room_id):
"""Retrieve the current state of a room member.
@@ -162,6 +163,7 @@ class RoomMemberStore(SQLBaseStore):
RoomsForUser(**r) for r in self.cursor_to_dict(txn)
]
+ @cached()
def get_joined_hosts_for_room(self, room_id):
return self.runInteraction(
"get_joined_hosts_for_room",
diff --git a/synapse/storage/schema/delta/17/user_threepids.sql b/synapse/storage/schema/delta/17/user_threepids.sql
new file mode 100644
index 0000000000..c17715ac80
--- /dev/null
+++ b/synapse/storage/schema/delta/17/user_threepids.sql
@@ -0,0 +1,9 @@
+CREATE TABLE user_threepids (
+ user_id TEXT NOT NULL,
+ medium TEXT NOT NULL,
+ address TEXT NOT NULL,
+ validated_at BIGINT NOT NULL,
+ added_at BIGINT NOT NULL,
+ CONSTRAINT user_medium_address UNIQUE (user_id, medium, address)
+);
+CREATE INDEX user_threepids_user_id ON user_threepids(user_id);
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 553ba9dd1f..95bc15c0dc 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -128,25 +128,18 @@ class StateStore(SQLBaseStore):
@defer.inlineCallbacks
def get_current_state(self, room_id, event_type=None, state_key=""):
- del_sql = (
- "SELECT event_id FROM redactions WHERE redacts = e.event_id "
- "LIMIT 1"
- )
-
sql = (
- "SELECT e.*, (%(redacted)s) AS redacted FROM events as e "
- "INNER JOIN current_state_events as c ON e.event_id = c.event_id "
- "INNER JOIN state_events as s ON e.event_id = s.event_id "
- "WHERE c.room_id = ? "
- ) % {
- "redacted": del_sql,
- }
+ "SELECT e.*, r.event_id FROM events as e"
+ " LEFT JOIN redactions as r ON r.redacts = e.event_id"
+ " INNER JOIN current_state_events as c ON e.event_id = c.event_id"
+ " WHERE c.room_id = ? "
+ )
if event_type and state_key is not None:
- sql += " AND s.type = ? AND s.state_key = ? "
+ sql += " AND c.type = ? AND c.state_key = ? "
args = (room_id, event_type, state_key)
elif event_type:
- sql += " AND s.type = ?"
+ sql += " AND c.type = ?"
args = (room_id, event_type)
else:
args = (room_id, )
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index 9d461d5e96..e40eb8a8c4 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -30,15 +30,13 @@ class IdGenerator(object):
@defer.inlineCallbacks
def get_next(self):
- with self._lock:
- if not self._next_id:
- res = yield self.store._execute_and_decode(
- "IdGenerator_%s" % (self.table,),
- "SELECT MAX(%s) as mx FROM %s" % (self.column, self.table,)
- )
-
- self._next_id = (res and res[0] and res[0]["mx"]) or 1
+ if self._next_id is None:
+ yield self.store.runInteraction(
+ "IdGenerator_%s" % (self.table,),
+ self.get_next_txn,
+ )
+ with self._lock:
i = self._next_id
self._next_id += 1
defer.returnValue(i)
@@ -86,10 +84,10 @@ class StreamIdGenerator(object):
with stream_id_gen.get_next_txn(txn) as stream_id:
# ... persist event ...
"""
- with self._lock:
- if not self._current_max:
- self._compute_current_max(txn)
+ if not self._current_max:
+ self._get_or_compute_current_max(txn)
+ with self._lock:
self._current_max += 1
next_id = self._current_max
@@ -110,22 +108,24 @@ class StreamIdGenerator(object):
"""Returns the maximum stream id such that all stream ids less than or
equal to it have been successfully persisted.
"""
+ if not self._current_max:
+ yield store.runInteraction(
+ "_compute_current_max",
+ self._get_or_compute_current_max,
+ )
+
with self._lock:
if self._unfinished_ids:
defer.returnValue(self._unfinished_ids[0] - 1)
- if not self._current_max:
- yield store.runInteraction(
- "_compute_current_max",
- self._compute_current_max,
- )
-
defer.returnValue(self._current_max)
- def _compute_current_max(self, txn):
- txn.execute("SELECT MAX(stream_ordering) FROM events")
- val, = txn.fetchone()
+ def _get_or_compute_current_max(self, txn):
+ with self._lock:
+ txn.execute("SELECT MAX(stream_ordering) FROM events")
+ rows = txn.fetchall()
+ val, = rows[0]
- self._current_max = int(val) if val else 1
+ self._current_max = int(val) if val else 1
- return self._current_max
+ return self._current_max
|