summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py3
-rw-r--r--synapse/storage/_base.py12
-rw-r--r--synapse/storage/engines/postgres.py2
-rw-r--r--synapse/storage/event_federation.py16
-rw-r--r--synapse/storage/registration.py6
-rw-r--r--synapse/storage/room.py27
-rw-r--r--synapse/storage/roommember.py8
-rw-r--r--synapse/storage/state.py34
-rw-r--r--synapse/storage/stream.py8
9 files changed, 68 insertions, 48 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 2773b2cb13..0cc14fb692 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -104,6 +104,8 @@ class DataStore(RoomMemberStore, RoomStore,
 
         self.client_ip_last_seen.prefill(*key + (now,))
 
+        # It's safe not to lock here: a) no unique constraint,
+        # b) LAST_SEEN_GRANULARITY makes concurrent updates incredibly unlikely
         yield self._simple_upsert(
             "user_ips",
             keyvalues={
@@ -117,6 +119,7 @@ class DataStore(RoomMemberStore, RoomStore,
                 "last_seen": now,
             },
             desc="insert_client_ip",
+            lock=False,
         )
 
     def get_user_ip_and_agents(self, user):
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 6017c2a6e8..c328b5274c 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -301,6 +301,7 @@ class SQLBaseStore(object):
         def inner_func(conn, *args, **kwargs):
             with LoggingContext("runInteraction") as context:
                 if self.database_engine.is_connection_closed(conn):
+                    logger.debug("Reconnecting closed database connection")
                     conn.reconnect()
 
                 current_context.copy_to(context)
@@ -451,7 +452,7 @@ class SQLBaseStore(object):
         txn.execute(sql, values.values())
 
     def _simple_upsert(self, table, keyvalues, values,
-                       insertion_values={}, desc="_simple_upsert"):
+                       insertion_values={}, desc="_simple_upsert", lock=True):
         """
         Args:
             table (str): The table to upsert into
@@ -463,11 +464,14 @@ class SQLBaseStore(object):
         return self.runInteraction(
             desc,
             self._simple_upsert_txn, table, keyvalues, values, insertion_values,
+            lock
         )
 
-    def _simple_upsert_txn(self, txn, table, keyvalues, values, insertion_values={}):
-        # We need to lock the table :(
-        self.database_engine.lock_table(txn, table)
+    def _simple_upsert_txn(self, txn, table, keyvalues, values, insertion_values={},
+                           lock=True):
+        # We need to lock the table :(, unless we're *really* careful
+        if lock:
+            self.database_engine.lock_table(txn, table)
 
         # Try to update
         sql = "UPDATE %s SET %s WHERE %s" % (
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 7125f66f01..64e34265f6 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -53,7 +53,7 @@ class PostgresEngine(object):
         return False
 
     def is_connection_closed(self, conn):
-        return bool(conn)
+        return bool(conn.closed)
 
     def lock_table(self, txn, table):
         txn.execute("LOCK TABLE %s in EXCLUSIVE MODE" % (table,))
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 54a3c9d805..fbbcce754b 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import SQLBaseStore
+from ._base import SQLBaseStore, cached
 from syutil.base64util import encode_base64
 
 import logging
@@ -96,11 +96,23 @@ class EventFederationStore(SQLBaseStore):
             room_id,
         )
 
+    @cached()
+    def get_latest_event_ids_in_room(self, room_id):
+        return self._simple_select_onecol(
+            table="event_forward_extremities",
+            keyvalues={
+                "room_id": room_id,
+            },
+            retcol="event_id",
+            desc="get_latest_events_in_room",
+        )
+
     def _get_latest_events_in_room(self, txn, room_id):
         sql = (
             "SELECT e.event_id, e.depth FROM events as e "
             "INNER JOIN event_forward_extremities as f "
             "ON e.event_id = f.event_id "
+            "AND e.room_id = f.room_id "
             "WHERE f.room_id = ?"
         )
 
@@ -318,6 +330,8 @@ class EventFederationStore(SQLBaseStore):
             )
             txn.execute(query)
 
+            self.get_latest_event_ids_in_room.invalidate(room_id)
+
     def get_backfill_events(self, room_id, event_list, limit):
         """Get a list of Events for a given topic that occurred before (and
         including) the events in event_list. Return a list of max size `limit`
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 026ba217d6..ff0a2a9e8b 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -112,14 +112,10 @@ class RegistrationStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def user_delete_access_tokens_apart_from(self, user_id, token_id):
-        rows = yield self.get_user_by_id(user_id)
-        if len(rows) == 0:
-            raise Exception("No such user!")
-
         yield self._execute(
             "delete_access_tokens_apart_from", None,
             "DELETE FROM access_tokens WHERE user_id = ? AND id != ?",
-            rows[0]['id'], token_id
+            user_id, token_id
         )
 
     @defer.inlineCallbacks
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 78572bbdd2..f956377632 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -188,26 +188,21 @@ class RoomStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def get_room_name_and_aliases(self, room_id):
-        del_sql = (
-            "SELECT event_id FROM redactions WHERE redacts = e.event_id "
-            "LIMIT 1"
-        )
+        def f(txn):
+            sql = (
+                "SELECT event_id FROM current_state_events "
+                "WHERE room_id = ? "
+            )
 
-        sql = (
-            "SELECT e.*, (%(redacted)s) AS redacted FROM events as e "
-            "INNER JOIN current_state_events as c ON e.event_id = c.event_id "
-            "WHERE c.room_id = ? "
-        ) % {
-            "redacted": del_sql,
-        }
+            sql += " AND ((type = 'm.room.name' AND state_key = '')"
+            sql += " OR type = 'm.room.aliases')"
 
-        sql += " AND ((c.type = 'm.room.name' AND c.state_key = '')"
-        sql += " OR c.type = 'm.room.aliases')"
-        args = (room_id,)
+            txn.execute(sql, (room_id,))
+            results = self.cursor_to_dict(txn)
 
-        results = yield self._execute_and_decode("get_current_state", sql, *args)
+            return self._parse_events_txn(txn, results)
 
-        events = yield self._parse_events(results)
+        events = yield self.runInteraction("get_room_name_and_aliases", f)
 
         name = None
         aliases = []
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 831169e220..09fb77a194 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -154,7 +154,9 @@ class RoomMemberStore(SQLBaseStore):
             "SELECT m.room_id, m.sender, m.membership"
             " FROM room_memberships as m"
             " INNER JOIN current_state_events as c"
-            " ON m.event_id = c.event_id"
+            " ON m.event_id = c.event_id "
+            " AND m.room_id = c.room_id "
+            " AND m.user_id = c.state_key"
             " WHERE %s"
         ) % (where_clause,)
 
@@ -212,7 +214,9 @@ class RoomMemberStore(SQLBaseStore):
         sql = (
             "SELECT m.* FROM room_memberships as m"
             " INNER JOIN current_state_events as c"
-            " ON m.event_id = c.event_id"
+            " ON m.event_id = c.event_id "
+            " AND m.room_id = c.room_id "
+            " AND m.user_id = c.state_key"
             " WHERE %(where)s"
         ) % {
             "where": where_clause,
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 95bc15c0dc..7e55e8bed6 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -128,25 +128,27 @@ class StateStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def get_current_state(self, room_id, event_type=None, state_key=""):
-        sql = (
-            "SELECT e.*, r.event_id FROM events as e"
-            " LEFT JOIN redactions as r ON r.redacts = e.event_id"
-            " INNER JOIN current_state_events as c ON e.event_id = c.event_id"
-            " WHERE c.room_id = ? "
-        )
+        def f(txn):
+            sql = (
+                "SELECT event_id FROM current_state_events"
+                " WHERE room_id = ? "
+            )
+
+            if event_type and state_key is not None:
+                sql += " AND type = ? AND state_key = ? "
+                args = (room_id, event_type, state_key)
+            elif event_type:
+                sql += " AND type = ?"
+                args = (room_id, event_type)
+            else:
+                args = (room_id, )
 
-        if event_type and state_key is not None:
-            sql += " AND c.type = ? AND c.state_key = ? "
-            args = (room_id, event_type, state_key)
-        elif event_type:
-            sql += " AND c.type = ?"
-            args = (room_id, event_type)
-        else:
-            args = (room_id, )
+            txn.execute(sql, args)
+            results = self.cursor_to_dict(txn)
 
-        results = yield self._execute_and_decode("get_current_state", sql, *args)
+            return self._parse_events_txn(txn, results)
 
-        events = yield self._parse_events(results)
+        events = yield self.runInteraction("get_current_state", f)
         defer.returnValue(events)
 
 
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index df6de7cbcd..280d4ad605 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -149,7 +149,8 @@ class StreamStore(SQLBaseStore):
         # select all the events between from/to with a sensible limit
         sql = (
             "SELECT e.event_id, e.room_id, e.type, s.state_key, "
-            "e.stream_ordering FROM events AS e LEFT JOIN state_events as s ON "
+            "e.stream_ordering FROM events AS e "
+            "LEFT JOIN state_events as s ON "
             "e.event_id = s.event_id "
             "WHERE e.stream_ordering > ? AND e.stream_ordering <= ? "
             "ORDER BY stream_ordering ASC LIMIT %(limit)d "
@@ -214,8 +215,9 @@ class StreamStore(SQLBaseStore):
 
         current_room_membership_sql = (
             "SELECT m.room_id FROM room_memberships as m "
-            "INNER JOIN current_state_events as c ON m.event_id = c.event_id "
-            "WHERE m.user_id = ? AND m.membership = 'join'"
+            " INNER JOIN current_state_events as c"
+            " ON m.event_id = c.event_id AND c.state_key = m.user_id"
+            " WHERE m.user_id = ? AND m.membership = 'join'"
         )
 
         # We also want to get any membership events about that user, e.g.