diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 77322a5c10..beafa51662 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -360,7 +360,7 @@ class Auth(object):
default=[""]
)[0]
if user and access_token and ip_addr:
- yield self.store.insert_client_ip(
+ self.store.insert_client_ip(
user=user,
access_token=access_token,
device_id=user_info["device_id"],
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 25c0014f97..2b46188c91 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -417,13 +417,13 @@ class FederationServer(FederationBase):
pdu.internal_metadata.outlier = True
elif min_depth and pdu.depth > min_depth:
if get_missing and prevs - seen:
- latest_tuples = yield self.store.get_latest_events_in_room(
+ latest = yield self.store.get_latest_event_ids_in_room(
pdu.room_id
)
# We add the prev events that we have seen to the latest
# list to ensure the remote server doesn't give them to us
- latest = set(e_id for e_id, _, _ in latest_tuples)
+ latest = set(latest)
latest |= seen
missing_events = yield self.get_missing_events(
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 492a630fdc..355ab317df 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -180,7 +180,7 @@ class ApplicationServicesHandler(object):
return
user_info = yield self.store.get_user_by_id(user_id)
- if len(user_info) > 0:
+ if not user_info:
defer.returnValue(False)
return
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 2e8009d3c3..4e2e50345e 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -159,7 +159,7 @@ class AuthHandler(BaseHandler):
logger.warn("Attempted to login as %s but they do not exist", user)
raise LoginError(401, "", errcode=Codes.UNAUTHORIZED)
- stored_hash = user_info[0]["password_hash"]
+ stored_hash = user_info["password_hash"]
if bcrypt.checkpw(password, stored_hash):
defer.returnValue(user)
else:
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 47456a28e9..cfa2e38ed2 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -529,11 +529,19 @@ class RoomListHandler(BaseHandler):
@defer.inlineCallbacks
def get_public_room_list(self):
chunk = yield self.store.get_rooms(is_public=True)
- for room in chunk:
- joined_users = yield self.store.get_users_in_room(
- room_id=room["room_id"],
- )
- room["num_joined_members"] = len(joined_users)
+ results = yield defer.gatherResults(
+ [
+ self.store.get_users_in_room(
+ room_id=room["room_id"],
+ )
+ for room in chunk
+ ],
+ consumeErrors=True,
+ )
+
+ for i, room in enumerate(chunk):
+ room["num_joined_members"] = len(results[i])
+
# FIXME (erikj): START is no longer a valid value
defer.returnValue({"start": "START", "end": "END", "chunk": chunk})
diff --git a/synapse/state.py b/synapse/state.py
index ba2500d61c..9dddb77d5b 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -86,12 +86,7 @@ class StateHandler(object):
If `event_type` is specified, then the method returns only the one
event (or None) with that `event_type` and `state_key`.
"""
- events = yield self.store.get_latest_events_in_room(room_id)
-
- event_ids = [
- e_id
- for e_id, _, _ in events
- ]
+ event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
cache = None
if self._state_cache is not None:
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 2773b2cb13..0cc14fb692 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -104,6 +104,8 @@ class DataStore(RoomMemberStore, RoomStore,
self.client_ip_last_seen.prefill(*key + (now,))
+ # It's safe not to lock here: a) no unique constraint,
+ # b) LAST_SEEN_GRANULARITY makes concurrent updates incredibly unlikely
yield self._simple_upsert(
"user_ips",
keyvalues={
@@ -117,6 +119,7 @@ class DataStore(RoomMemberStore, RoomStore,
"last_seen": now,
},
desc="insert_client_ip",
+ lock=False,
)
def get_user_ip_and_agents(self, user):
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 6017c2a6e8..c328b5274c 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -301,6 +301,7 @@ class SQLBaseStore(object):
def inner_func(conn, *args, **kwargs):
with LoggingContext("runInteraction") as context:
if self.database_engine.is_connection_closed(conn):
+ logger.debug("Reconnecting closed database connection")
conn.reconnect()
current_context.copy_to(context)
@@ -451,7 +452,7 @@ class SQLBaseStore(object):
txn.execute(sql, values.values())
def _simple_upsert(self, table, keyvalues, values,
- insertion_values={}, desc="_simple_upsert"):
+ insertion_values={}, desc="_simple_upsert", lock=True):
"""
Args:
table (str): The table to upsert into
@@ -463,11 +464,14 @@ class SQLBaseStore(object):
return self.runInteraction(
desc,
self._simple_upsert_txn, table, keyvalues, values, insertion_values,
+ lock
)
- def _simple_upsert_txn(self, txn, table, keyvalues, values, insertion_values={}):
- # We need to lock the table :(
- self.database_engine.lock_table(txn, table)
+ def _simple_upsert_txn(self, txn, table, keyvalues, values, insertion_values={},
+ lock=True):
+ # We need to lock the table :(, unless we're *really* careful
+ if lock:
+ self.database_engine.lock_table(txn, table)
# Try to update
sql = "UPDATE %s SET %s WHERE %s" % (
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 7125f66f01..64e34265f6 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -53,7 +53,7 @@ class PostgresEngine(object):
return False
def is_connection_closed(self, conn):
- return bool(conn)
+ return bool(conn.closed)
def lock_table(self, txn, table):
txn.execute("LOCK TABLE %s in EXCLUSIVE MODE" % (table,))
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 54a3c9d805..fbbcce754b 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import SQLBaseStore
+from ._base import SQLBaseStore, cached
from syutil.base64util import encode_base64
import logging
@@ -96,11 +96,23 @@ class EventFederationStore(SQLBaseStore):
room_id,
)
+ @cached()
+ def get_latest_event_ids_in_room(self, room_id):
+ return self._simple_select_onecol(
+ table="event_forward_extremities",
+ keyvalues={
+ "room_id": room_id,
+ },
+ retcol="event_id",
+ desc="get_latest_events_in_room",
+ )
+
def _get_latest_events_in_room(self, txn, room_id):
sql = (
"SELECT e.event_id, e.depth FROM events as e "
"INNER JOIN event_forward_extremities as f "
"ON e.event_id = f.event_id "
+ "AND e.room_id = f.room_id "
"WHERE f.room_id = ?"
)
@@ -318,6 +330,8 @@ class EventFederationStore(SQLBaseStore):
)
txn.execute(query)
+ self.get_latest_event_ids_in_room.invalidate(room_id)
+
def get_backfill_events(self, room_id, event_list, limit):
"""Get a list of Events for a given topic that occurred before (and
including) the events in event_list. Return a list of max size `limit`
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 026ba217d6..ff0a2a9e8b 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -112,14 +112,10 @@ class RegistrationStore(SQLBaseStore):
@defer.inlineCallbacks
def user_delete_access_tokens_apart_from(self, user_id, token_id):
- rows = yield self.get_user_by_id(user_id)
- if len(rows) == 0:
- raise Exception("No such user!")
-
yield self._execute(
"delete_access_tokens_apart_from", None,
"DELETE FROM access_tokens WHERE user_id = ? AND id != ?",
- rows[0]['id'], token_id
+ user_id, token_id
)
@defer.inlineCallbacks
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 78572bbdd2..f956377632 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -188,26 +188,21 @@ class RoomStore(SQLBaseStore):
@defer.inlineCallbacks
def get_room_name_and_aliases(self, room_id):
- del_sql = (
- "SELECT event_id FROM redactions WHERE redacts = e.event_id "
- "LIMIT 1"
- )
+ def f(txn):
+ sql = (
+ "SELECT event_id FROM current_state_events "
+ "WHERE room_id = ? "
+ )
- sql = (
- "SELECT e.*, (%(redacted)s) AS redacted FROM events as e "
- "INNER JOIN current_state_events as c ON e.event_id = c.event_id "
- "WHERE c.room_id = ? "
- ) % {
- "redacted": del_sql,
- }
+ sql += " AND ((type = 'm.room.name' AND state_key = '')"
+ sql += " OR type = 'm.room.aliases')"
- sql += " AND ((c.type = 'm.room.name' AND c.state_key = '')"
- sql += " OR c.type = 'm.room.aliases')"
- args = (room_id,)
+ txn.execute(sql, (room_id,))
+ results = self.cursor_to_dict(txn)
- results = yield self._execute_and_decode("get_current_state", sql, *args)
+ return self._parse_events_txn(txn, results)
- events = yield self._parse_events(results)
+ events = yield self.runInteraction("get_room_name_and_aliases", f)
name = None
aliases = []
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 831169e220..09fb77a194 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -154,7 +154,9 @@ class RoomMemberStore(SQLBaseStore):
"SELECT m.room_id, m.sender, m.membership"
" FROM room_memberships as m"
" INNER JOIN current_state_events as c"
- " ON m.event_id = c.event_id"
+ " ON m.event_id = c.event_id "
+ " AND m.room_id = c.room_id "
+ " AND m.user_id = c.state_key"
" WHERE %s"
) % (where_clause,)
@@ -212,7 +214,9 @@ class RoomMemberStore(SQLBaseStore):
sql = (
"SELECT m.* FROM room_memberships as m"
" INNER JOIN current_state_events as c"
- " ON m.event_id = c.event_id"
+ " ON m.event_id = c.event_id "
+ " AND m.room_id = c.room_id "
+ " AND m.user_id = c.state_key"
" WHERE %(where)s"
) % {
"where": where_clause,
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 95bc15c0dc..7e55e8bed6 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -128,25 +128,27 @@ class StateStore(SQLBaseStore):
@defer.inlineCallbacks
def get_current_state(self, room_id, event_type=None, state_key=""):
- sql = (
- "SELECT e.*, r.event_id FROM events as e"
- " LEFT JOIN redactions as r ON r.redacts = e.event_id"
- " INNER JOIN current_state_events as c ON e.event_id = c.event_id"
- " WHERE c.room_id = ? "
- )
+ def f(txn):
+ sql = (
+ "SELECT event_id FROM current_state_events"
+ " WHERE room_id = ? "
+ )
+
+ if event_type and state_key is not None:
+ sql += " AND type = ? AND state_key = ? "
+ args = (room_id, event_type, state_key)
+ elif event_type:
+ sql += " AND type = ?"
+ args = (room_id, event_type)
+ else:
+ args = (room_id, )
- if event_type and state_key is not None:
- sql += " AND c.type = ? AND c.state_key = ? "
- args = (room_id, event_type, state_key)
- elif event_type:
- sql += " AND c.type = ?"
- args = (room_id, event_type)
- else:
- args = (room_id, )
+ txn.execute(sql, args)
+ results = self.cursor_to_dict(txn)
- results = yield self._execute_and_decode("get_current_state", sql, *args)
+ return self._parse_events_txn(txn, results)
- events = yield self._parse_events(results)
+ events = yield self.runInteraction("get_current_state", f)
defer.returnValue(events)
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index df6de7cbcd..280d4ad605 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -149,7 +149,8 @@ class StreamStore(SQLBaseStore):
# select all the events between from/to with a sensible limit
sql = (
"SELECT e.event_id, e.room_id, e.type, s.state_key, "
- "e.stream_ordering FROM events AS e LEFT JOIN state_events as s ON "
+ "e.stream_ordering FROM events AS e "
+ "LEFT JOIN state_events as s ON "
"e.event_id = s.event_id "
"WHERE e.stream_ordering > ? AND e.stream_ordering <= ? "
"ORDER BY stream_ordering ASC LIMIT %(limit)d "
@@ -214,8 +215,9 @@ class StreamStore(SQLBaseStore):
current_room_membership_sql = (
"SELECT m.room_id FROM room_memberships as m "
- "INNER JOIN current_state_events as c ON m.event_id = c.event_id "
- "WHERE m.user_id = ? AND m.membership = 'join'"
+ " INNER JOIN current_state_events as c"
+ " ON m.event_id = c.event_id AND c.state_key = m.user_id"
+ " WHERE m.user_id = ? AND m.membership = 'join'"
)
# We also want to get any membership events about that user, e.g.
|