diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/appservice.py | 20 | ||||
-rw-r--r-- | synapse/storage/keys.py | 8 | ||||
-rw-r--r-- | synapse/storage/pusher.py | 54 | ||||
-rw-r--r-- | synapse/storage/registration.py | 4 | ||||
-rw-r--r-- | synapse/storage/room.py | 15 | ||||
-rw-r--r-- | synapse/storage/roommember.py | 2 | ||||
-rw-r--r-- | synapse/storage/schema/delta/17/user_threepids.sql | 9 | ||||
-rw-r--r-- | synapse/storage/state.py | 21 | ||||
-rw-r--r-- | synapse/storage/util/id_generators.py | 44 |
9 files changed, 112 insertions, 65 deletions
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 63d1af4e86..39b7881c40 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -355,11 +355,11 @@ class ApplicationServiceTransactionStore(SQLBaseStore): # being sent) last_txn_id = self._get_last_txn(txn, service.id) - result = txn.execute( + txn.execute( "SELECT MAX(txn_id) FROM application_services_txns WHERE as_id=?", (service.id,) ) - highest_txn_id = result.fetchone()[0] + highest_txn_id = txn.fetchone()[0] if highest_txn_id is None: highest_txn_id = 0 @@ -441,15 +441,17 @@ class ApplicationServiceTransactionStore(SQLBaseStore): def _get_oldest_unsent_txn(self, txn, service): # Monotonically increasing txn ids, so just select the smallest # one in the txns table (we delete them when they are sent) - result = txn.execute( - "SELECT MIN(txn_id), * FROM application_services_txns WHERE as_id=?", + txn.execute( + "SELECT * FROM application_services_txns WHERE as_id=?" + " ORDER BY txn_id ASC LIMIT 1", (service.id,) ) - entry = self.cursor_to_dict(result)[0] - if not entry or entry["txn_id"] is None: - # the min(txn_id) part will force a row, so entry may not be None + rows = self.cursor_to_dict(txn) + if not rows: return None + entry = rows[0] + event_ids = json.loads(entry["event_ids"]) events = self._get_events_txn(txn, event_ids) @@ -458,11 +460,11 @@ class ApplicationServiceTransactionStore(SQLBaseStore): ) def _get_last_txn(self, txn, service_id): - result = txn.execute( + txn.execute( "SELECT last_txn FROM application_services_state WHERE as_id=?", (service_id,) ) - last_txn_id = result.fetchone() + last_txn_id = txn.fetchone() if last_txn_id is None or last_txn_id[0] is None: # no row exists return 0 else: diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py index cbe9339ccf..5bdf497b93 100644 --- a/synapse/storage/keys.py +++ b/synapse/storage/keys.py @@ -137,8 +137,13 @@ class KeyStore(SQLBaseStore): ts_valid_until_ms (int): The time when this json stops being valid. key_json (bytes): The encoded JSON. """ - return self._simple_insert( + return self._simple_upsert( table="server_keys_json", + keyvalues={ + "server_name": server_name, + "key_id": key_id, + "from_server": from_server, + }, values={ "server_name": server_name, "key_id": key_id, @@ -147,7 +152,6 @@ class KeyStore(SQLBaseStore): "ts_valid_until_ms": ts_expires_ms, "key_json": buffer(key_json_bytes), }, - or_replace=True, ) def get_server_keys_json(self, server_keys): diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 2582a1da66..08ea62681b 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -21,34 +21,62 @@ from synapse.api.errors import StoreError from syutil.jsonutil import encode_canonical_json import logging +import simplejson as json +import types logger = logging.getLogger(__name__) class PusherStore(SQLBaseStore): + def _decode_pushers_rows(self, rows): + for r in rows: + dataJson = r['data'] + r['data'] = None + try: + if isinstance(dataJson, types.BufferType): + dataJson = str(dataJson).decode("UTF8") + + r['data'] = json.loads(dataJson) + except Exception as e: + logger.warn( + "Invalid JSON in data for pusher %d: %s, %s", + r['id'], dataJson, e.message, + ) + pass + + if isinstance(r['pushkey'], types.BufferType): + r['pushkey'] = str(r['pushkey']).decode("UTF8") + + return rows + @defer.inlineCallbacks def get_pushers_by_app_id_and_pushkey(self, app_id, pushkey): - sql = ( - "SELECT * FROM pushers " - "WHERE app_id = ? AND pushkey = ?" - ) + def r(txn): + sql = ( + "SELECT * FROM pushers" + " WHERE app_id = ? AND pushkey = ?" + ) - rows = yield self._execute_and_decode( - "get_pushers_by_app_id_and_pushkey", - sql, - app_id, pushkey + txn.execute(sql, (app_id, pushkey,)) + rows = self.cursor_to_dict(txn) + + return self._decode_pushers_rows(rows) + + rows = yield self.runInteraction( + "get_pushers_by_app_id_and_pushkey", r ) defer.returnValue(rows) @defer.inlineCallbacks def get_all_pushers(self): - sql = ( - "SELECT * FROM pushers" - ) + def get_pushers(txn): + txn.execute("SELECT * FROM pushers") + rows = self.cursor_to_dict(txn) - rows = yield self._execute_and_decode("get_all_pushers", sql) + return self._decode_pushers_rows(rows) + rows = yield self.runInteraction("get_all_pushers", get_pushers) defer.returnValue(rows) @defer.inlineCallbacks @@ -72,7 +100,7 @@ class PusherStore(SQLBaseStore): device_display_name=device_display_name, ts=pushkey_ts, lang=lang, - data=encode_canonical_json(data).decode("UTF-8"), + data=encode_canonical_json(data), ), insertion_values=dict( id=next_id, diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index a986c4816e..026ba217d6 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -181,7 +181,7 @@ class RegistrationStore(SQLBaseStore): @defer.inlineCallbacks def user_add_threepid(self, user_id, medium, address, validated_at, added_at): yield self._simple_upsert("user_threepids", { - "user": user_id, + "user_id": user_id, "medium": medium, "address": address, }, { @@ -193,7 +193,7 @@ class RegistrationStore(SQLBaseStore): def user_get_threepids(self, user_id): ret = yield self._simple_select_list( "user_threepids", { - "user": user_id + "user_id": user_id }, ['medium', 'address', 'validated_at', 'added_at'], 'user_get_threepids' diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 48ebb33057..78572bbdd2 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -75,6 +75,16 @@ class RoomStore(SQLBaseStore): allow_none=True, ) + def get_public_room_ids(self): + return self._simple_select_onecol( + table="rooms", + keyvalues={ + "is_public": True, + }, + retcol="room_id", + desc="get_public_room_ids", + ) + @defer.inlineCallbacks def get_rooms(self, is_public): """Retrieve a list of all public rooms. @@ -186,14 +196,13 @@ class RoomStore(SQLBaseStore): sql = ( "SELECT e.*, (%(redacted)s) AS redacted FROM events as e " "INNER JOIN current_state_events as c ON e.event_id = c.event_id " - "INNER JOIN state_events as s ON e.event_id = s.event_id " "WHERE c.room_id = ? " ) % { "redacted": del_sql, } - sql += " AND ((s.type = 'm.room.name' AND s.state_key = '')" - sql += " OR s.type = 'm.room.aliases')" + sql += " AND ((c.type = 'm.room.name' AND c.state_key = '')" + sql += " OR c.type = 'm.room.aliases')" args = (room_id,) results = yield self._execute_and_decode("get_current_state", sql, *args) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 8ea5756d61..831169e220 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -65,6 +65,7 @@ class RoomMemberStore(SQLBaseStore): ) self.get_rooms_for_user.invalidate(target_user_id) + self.get_joined_hosts_for_room.invalidate(event.room_id) def get_room_member(self, user_id, room_id): """Retrieve the current state of a room member. @@ -162,6 +163,7 @@ class RoomMemberStore(SQLBaseStore): RoomsForUser(**r) for r in self.cursor_to_dict(txn) ] + @cached() def get_joined_hosts_for_room(self, room_id): return self.runInteraction( "get_joined_hosts_for_room", diff --git a/synapse/storage/schema/delta/17/user_threepids.sql b/synapse/storage/schema/delta/17/user_threepids.sql new file mode 100644 index 0000000000..c17715ac80 --- /dev/null +++ b/synapse/storage/schema/delta/17/user_threepids.sql @@ -0,0 +1,9 @@ +CREATE TABLE user_threepids ( + user_id TEXT NOT NULL, + medium TEXT NOT NULL, + address TEXT NOT NULL, + validated_at BIGINT NOT NULL, + added_at BIGINT NOT NULL, + CONSTRAINT user_medium_address UNIQUE (user_id, medium, address) +); +CREATE INDEX user_threepids_user_id ON user_threepids(user_id); diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 553ba9dd1f..95bc15c0dc 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -128,25 +128,18 @@ class StateStore(SQLBaseStore): @defer.inlineCallbacks def get_current_state(self, room_id, event_type=None, state_key=""): - del_sql = ( - "SELECT event_id FROM redactions WHERE redacts = e.event_id " - "LIMIT 1" - ) - sql = ( - "SELECT e.*, (%(redacted)s) AS redacted FROM events as e " - "INNER JOIN current_state_events as c ON e.event_id = c.event_id " - "INNER JOIN state_events as s ON e.event_id = s.event_id " - "WHERE c.room_id = ? " - ) % { - "redacted": del_sql, - } + "SELECT e.*, r.event_id FROM events as e" + " LEFT JOIN redactions as r ON r.redacts = e.event_id" + " INNER JOIN current_state_events as c ON e.event_id = c.event_id" + " WHERE c.room_id = ? " + ) if event_type and state_key is not None: - sql += " AND s.type = ? AND s.state_key = ? " + sql += " AND c.type = ? AND c.state_key = ? " args = (room_id, event_type, state_key) elif event_type: - sql += " AND s.type = ?" + sql += " AND c.type = ?" args = (room_id, event_type) else: args = (room_id, ) diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 9d461d5e96..e40eb8a8c4 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -30,15 +30,13 @@ class IdGenerator(object): @defer.inlineCallbacks def get_next(self): - with self._lock: - if not self._next_id: - res = yield self.store._execute_and_decode( - "IdGenerator_%s" % (self.table,), - "SELECT MAX(%s) as mx FROM %s" % (self.column, self.table,) - ) - - self._next_id = (res and res[0] and res[0]["mx"]) or 1 + if self._next_id is None: + yield self.store.runInteraction( + "IdGenerator_%s" % (self.table,), + self.get_next_txn, + ) + with self._lock: i = self._next_id self._next_id += 1 defer.returnValue(i) @@ -86,10 +84,10 @@ class StreamIdGenerator(object): with stream_id_gen.get_next_txn(txn) as stream_id: # ... persist event ... """ - with self._lock: - if not self._current_max: - self._compute_current_max(txn) + if not self._current_max: + self._get_or_compute_current_max(txn) + with self._lock: self._current_max += 1 next_id = self._current_max @@ -110,22 +108,24 @@ class StreamIdGenerator(object): """Returns the maximum stream id such that all stream ids less than or equal to it have been successfully persisted. """ + if not self._current_max: + yield store.runInteraction( + "_compute_current_max", + self._get_or_compute_current_max, + ) + with self._lock: if self._unfinished_ids: defer.returnValue(self._unfinished_ids[0] - 1) - if not self._current_max: - yield store.runInteraction( - "_compute_current_max", - self._compute_current_max, - ) - defer.returnValue(self._current_max) - def _compute_current_max(self, txn): - txn.execute("SELECT MAX(stream_ordering) FROM events") - val, = txn.fetchone() + def _get_or_compute_current_max(self, txn): + with self._lock: + txn.execute("SELECT MAX(stream_ordering) FROM events") + rows = txn.fetchall() + val, = rows[0] - self._current_max = int(val) if val else 1 + self._current_max = int(val) if val else 1 - return self._current_max + return self._current_max |