diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/__init__.py | 3 | ||||
-rw-r--r-- | synapse/storage/_base.py | 12 | ||||
-rw-r--r-- | synapse/storage/engines/postgres.py | 2 | ||||
-rw-r--r-- | synapse/storage/event_federation.py | 16 | ||||
-rw-r--r-- | synapse/storage/registration.py | 6 | ||||
-rw-r--r-- | synapse/storage/room.py | 27 | ||||
-rw-r--r-- | synapse/storage/roommember.py | 8 | ||||
-rw-r--r-- | synapse/storage/state.py | 34 | ||||
-rw-r--r-- | synapse/storage/stream.py | 8 |
9 files changed, 68 insertions, 48 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 2773b2cb13..0cc14fb692 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -104,6 +104,8 @@ class DataStore(RoomMemberStore, RoomStore, self.client_ip_last_seen.prefill(*key + (now,)) + # It's safe not to lock here: a) no unique constraint, + # b) LAST_SEEN_GRANULARITY makes concurrent updates incredibly unlikely yield self._simple_upsert( "user_ips", keyvalues={ @@ -117,6 +119,7 @@ class DataStore(RoomMemberStore, RoomStore, "last_seen": now, }, desc="insert_client_ip", + lock=False, ) def get_user_ip_and_agents(self, user): diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 6017c2a6e8..c328b5274c 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -301,6 +301,7 @@ class SQLBaseStore(object): def inner_func(conn, *args, **kwargs): with LoggingContext("runInteraction") as context: if self.database_engine.is_connection_closed(conn): + logger.debug("Reconnecting closed database connection") conn.reconnect() current_context.copy_to(context) @@ -451,7 +452,7 @@ class SQLBaseStore(object): txn.execute(sql, values.values()) def _simple_upsert(self, table, keyvalues, values, - insertion_values={}, desc="_simple_upsert"): + insertion_values={}, desc="_simple_upsert", lock=True): """ Args: table (str): The table to upsert into @@ -463,11 +464,14 @@ class SQLBaseStore(object): return self.runInteraction( desc, self._simple_upsert_txn, table, keyvalues, values, insertion_values, + lock ) - def _simple_upsert_txn(self, txn, table, keyvalues, values, insertion_values={}): - # We need to lock the table :( - self.database_engine.lock_table(txn, table) + def _simple_upsert_txn(self, txn, table, keyvalues, values, insertion_values={}, + lock=True): + # We need to lock the table :(, unless we're *really* careful + if lock: + self.database_engine.lock_table(txn, table) # Try to update sql = "UPDATE %s SET %s WHERE %s" % ( diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 7125f66f01..64e34265f6 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -53,7 +53,7 @@ class PostgresEngine(object): return False def is_connection_closed(self, conn): - return bool(conn) + return bool(conn.closed) def lock_table(self, txn, table): txn.execute("LOCK TABLE %s in EXCLUSIVE MODE" % (table,)) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 54a3c9d805..fbbcce754b 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import SQLBaseStore +from ._base import SQLBaseStore, cached from syutil.base64util import encode_base64 import logging @@ -96,11 +96,23 @@ class EventFederationStore(SQLBaseStore): room_id, ) + @cached() + def get_latest_event_ids_in_room(self, room_id): + return self._simple_select_onecol( + table="event_forward_extremities", + keyvalues={ + "room_id": room_id, + }, + retcol="event_id", + desc="get_latest_events_in_room", + ) + def _get_latest_events_in_room(self, txn, room_id): sql = ( "SELECT e.event_id, e.depth FROM events as e " "INNER JOIN event_forward_extremities as f " "ON e.event_id = f.event_id " + "AND e.room_id = f.room_id " "WHERE f.room_id = ?" ) @@ -318,6 +330,8 @@ class EventFederationStore(SQLBaseStore): ) txn.execute(query) + self.get_latest_event_ids_in_room.invalidate(room_id) + def get_backfill_events(self, room_id, event_list, limit): """Get a list of Events for a given topic that occurred before (and including) the events in event_list. Return a list of max size `limit` diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 026ba217d6..ff0a2a9e8b 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -112,14 +112,10 @@ class RegistrationStore(SQLBaseStore): @defer.inlineCallbacks def user_delete_access_tokens_apart_from(self, user_id, token_id): - rows = yield self.get_user_by_id(user_id) - if len(rows) == 0: - raise Exception("No such user!") - yield self._execute( "delete_access_tokens_apart_from", None, "DELETE FROM access_tokens WHERE user_id = ? AND id != ?", - rows[0]['id'], token_id + user_id, token_id ) @defer.inlineCallbacks diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 78572bbdd2..f956377632 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -188,26 +188,21 @@ class RoomStore(SQLBaseStore): @defer.inlineCallbacks def get_room_name_and_aliases(self, room_id): - del_sql = ( - "SELECT event_id FROM redactions WHERE redacts = e.event_id " - "LIMIT 1" - ) + def f(txn): + sql = ( + "SELECT event_id FROM current_state_events " + "WHERE room_id = ? " + ) - sql = ( - "SELECT e.*, (%(redacted)s) AS redacted FROM events as e " - "INNER JOIN current_state_events as c ON e.event_id = c.event_id " - "WHERE c.room_id = ? " - ) % { - "redacted": del_sql, - } + sql += " AND ((type = 'm.room.name' AND state_key = '')" + sql += " OR type = 'm.room.aliases')" - sql += " AND ((c.type = 'm.room.name' AND c.state_key = '')" - sql += " OR c.type = 'm.room.aliases')" - args = (room_id,) + txn.execute(sql, (room_id,)) + results = self.cursor_to_dict(txn) - results = yield self._execute_and_decode("get_current_state", sql, *args) + return self._parse_events_txn(txn, results) - events = yield self._parse_events(results) + events = yield self.runInteraction("get_room_name_and_aliases", f) name = None aliases = [] diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 831169e220..09fb77a194 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -154,7 +154,9 @@ class RoomMemberStore(SQLBaseStore): "SELECT m.room_id, m.sender, m.membership" " FROM room_memberships as m" " INNER JOIN current_state_events as c" - " ON m.event_id = c.event_id" + " ON m.event_id = c.event_id " + " AND m.room_id = c.room_id " + " AND m.user_id = c.state_key" " WHERE %s" ) % (where_clause,) @@ -212,7 +214,9 @@ class RoomMemberStore(SQLBaseStore): sql = ( "SELECT m.* FROM room_memberships as m" " INNER JOIN current_state_events as c" - " ON m.event_id = c.event_id" + " ON m.event_id = c.event_id " + " AND m.room_id = c.room_id " + " AND m.user_id = c.state_key" " WHERE %(where)s" ) % { "where": where_clause, diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 95bc15c0dc..7e55e8bed6 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -128,25 +128,27 @@ class StateStore(SQLBaseStore): @defer.inlineCallbacks def get_current_state(self, room_id, event_type=None, state_key=""): - sql = ( - "SELECT e.*, r.event_id FROM events as e" - " LEFT JOIN redactions as r ON r.redacts = e.event_id" - " INNER JOIN current_state_events as c ON e.event_id = c.event_id" - " WHERE c.room_id = ? " - ) + def f(txn): + sql = ( + "SELECT event_id FROM current_state_events" + " WHERE room_id = ? " + ) + + if event_type and state_key is not None: + sql += " AND type = ? AND state_key = ? " + args = (room_id, event_type, state_key) + elif event_type: + sql += " AND type = ?" + args = (room_id, event_type) + else: + args = (room_id, ) - if event_type and state_key is not None: - sql += " AND c.type = ? AND c.state_key = ? " - args = (room_id, event_type, state_key) - elif event_type: - sql += " AND c.type = ?" - args = (room_id, event_type) - else: - args = (room_id, ) + txn.execute(sql, args) + results = self.cursor_to_dict(txn) - results = yield self._execute_and_decode("get_current_state", sql, *args) + return self._parse_events_txn(txn, results) - events = yield self._parse_events(results) + events = yield self.runInteraction("get_current_state", f) defer.returnValue(events) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index df6de7cbcd..280d4ad605 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -149,7 +149,8 @@ class StreamStore(SQLBaseStore): # select all the events between from/to with a sensible limit sql = ( "SELECT e.event_id, e.room_id, e.type, s.state_key, " - "e.stream_ordering FROM events AS e LEFT JOIN state_events as s ON " + "e.stream_ordering FROM events AS e " + "LEFT JOIN state_events as s ON " "e.event_id = s.event_id " "WHERE e.stream_ordering > ? AND e.stream_ordering <= ? " "ORDER BY stream_ordering ASC LIMIT %(limit)d " @@ -214,8 +215,9 @@ class StreamStore(SQLBaseStore): current_room_membership_sql = ( "SELECT m.room_id FROM room_memberships as m " - "INNER JOIN current_state_events as c ON m.event_id = c.event_id " - "WHERE m.user_id = ? AND m.membership = 'join'" + " INNER JOIN current_state_events as c" + " ON m.event_id = c.event_id AND c.state_key = m.user_id" + " WHERE m.user_id = ? AND m.membership = 'join'" ) # We also want to get any membership events about that user, e.g. |