summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2014-11-26 12:06:36 +0000
committerErik Johnston <erik@matrix.org>2014-11-26 12:06:36 +0000
commit48ee9ddb22b36f1adc23290e76287a922488596d (patch)
treed9eccedd90f9c573634c5456cfbbb65684214c6e /synapse/storage
parentUse tagged version of matrix-angular-sdk (diff)
parentBump version numbers and change log (diff)
downloadsynapse-0.5.1.tar.xz
Merge branch 'release-v0.5.1' of github.com:matrix-org/synapse v0.5.1
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py58
-rw-r--r--synapse/storage/_base.py19
-rw-r--r--synapse/storage/registration.py16
-rw-r--r--synapse/storage/room.py4
-rw-r--r--synapse/storage/roommember.py4
-rw-r--r--synapse/storage/schema/delta/v8.sql34
-rw-r--r--synapse/storage/schema/event_signatures.sql2
-rw-r--r--synapse/storage/signatures.py10
-rw-r--r--synapse/storage/state.py2
-rw-r--r--synapse/storage/stream.py11
10 files changed, 114 insertions, 46 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 330d3b793f..1fb33171e8 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -67,7 +67,7 @@ SCHEMAS = [
 
 # Remember to update this number every time an incompatible change is made to
 # database schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 7
+SCHEMA_VERSION = 8
 
 
 class _RollbackButIsFineException(Exception):
@@ -93,7 +93,8 @@ class DataStore(RoomMemberStore, RoomStore,
 
     @defer.inlineCallbacks
     @log_function
-    def persist_event(self, event, backfilled=False, is_new_state=True):
+    def persist_event(self, event, backfilled=False, is_new_state=True,
+                      current_state=None):
         stream_ordering = None
         if backfilled:
             if not self.min_token_deferred.called:
@@ -109,6 +110,7 @@ class DataStore(RoomMemberStore, RoomStore,
                 backfilled=backfilled,
                 stream_ordering=stream_ordering,
                 is_new_state=is_new_state,
+                current_state=current_state,
             )
         except _RollbackButIsFineException:
             pass
@@ -137,7 +139,7 @@ class DataStore(RoomMemberStore, RoomStore,
 
     @log_function
     def _persist_event_txn(self, txn, event, backfilled, stream_ordering=None,
-                           is_new_state=True):
+                           is_new_state=True, current_state=None):
         if event.type == RoomMemberEvent.TYPE:
             self._store_room_member_txn(txn, event)
         elif event.type == FeedbackEvent.TYPE:
@@ -206,8 +208,24 @@ class DataStore(RoomMemberStore, RoomStore,
 
         self._store_state_groups_txn(txn, event)
 
+        if current_state:
+            txn.execute("DELETE FROM current_state_events")
+
+            for s in current_state:
+                self._simple_insert_txn(
+                    txn,
+                    "current_state_events",
+                    {
+                        "event_id": s.event_id,
+                        "room_id": s.room_id,
+                        "type": s.type,
+                        "state_key": s.state_key,
+                    },
+                    or_replace=True,
+                )
+
         is_state = hasattr(event, "state_key") and event.state_key is not None
-        if is_new_state and is_state:
+        if is_state:
             vals = {
                 "event_id": event.event_id,
                 "room_id": event.room_id,
@@ -225,17 +243,18 @@ class DataStore(RoomMemberStore, RoomStore,
                 or_replace=True,
             )
 
-            self._simple_insert_txn(
-                txn,
-                "current_state_events",
-                {
-                    "event_id": event.event_id,
-                    "room_id": event.room_id,
-                    "type": event.type,
-                    "state_key": event.state_key,
-                },
-                or_replace=True,
-            )
+            if is_new_state:
+                self._simple_insert_txn(
+                    txn,
+                    "current_state_events",
+                    {
+                        "event_id": event.event_id,
+                        "room_id": event.room_id,
+                        "type": event.type,
+                        "state_key": event.state_key,
+                    },
+                    or_replace=True,
+                )
 
             for e_id, h in event.prev_state:
                 self._simple_insert_txn(
@@ -312,7 +331,12 @@ class DataStore(RoomMemberStore, RoomStore,
             txn, event.event_id, ref_alg, ref_hash_bytes
         )
 
-        self._update_min_depth_for_room_txn(txn, event.room_id, event.depth)
+        if not outlier:
+            self._update_min_depth_for_room_txn(
+                txn,
+                event.room_id,
+                event.depth
+            )
 
     def _store_redaction(self, txn, event):
         txn.execute(
@@ -508,7 +532,7 @@ def prepare_database(db_conn):
                 "new for the server to understand"
             )
         elif user_version < SCHEMA_VERSION:
-            logging.info(
+            logger.info(
                 "Upgrading database from version %d",
                 user_version
             )
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 5d4be09a82..4881f03368 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -57,7 +57,7 @@ class LoggingTransaction(object):
             if args and args[0]:
                 values = args[0]
                 sql_logger.debug(
-                    "[SQL values] {%s} " + ", ".join(("<%s>",) * len(values)),
+                    "[SQL values] {%s} " + ", ".join(("<%r>",) * len(values)),
                     self.name,
                     *values
                 )
@@ -91,6 +91,7 @@ class SQLBaseStore(object):
     def runInteraction(self, desc, func, *args, **kwargs):
         """Wraps the .runInteraction() method on the underlying db_pool."""
         current_context = LoggingContext.current_context()
+
         def inner_func(txn, *args, **kwargs):
             with LoggingContext("runInteraction") as context:
                 current_context.copy_to(context)
@@ -115,7 +116,6 @@ class SQLBaseStore(object):
                         "[TXN END] {%s} %f",
                         name, end - start
                     )
-
         with PreserveLoggingContext():
             result = yield self._db_pool.runInteraction(
                 inner_func, *args, **kwargs
@@ -246,7 +246,10 @@ class SQLBaseStore(object):
                 raise StoreError(404, "No row found")
 
     def _simple_select_onecol_txn(self, txn, table, keyvalues, retcol):
-        sql = "SELECT %(retcol)s FROM %(table)s WHERE %(where)s" % {
+        sql = (
+            "SELECT %(retcol)s FROM %(table)s WHERE %(where)s "
+            "ORDER BY rowid asc"
+        ) % {
             "retcol": retcol,
             "table": table,
             "where": " AND ".join("%s = ?" % k for k in keyvalues.keys()),
@@ -299,7 +302,7 @@ class SQLBaseStore(object):
             keyvalues : dict of column names and values to select the rows with
             retcols : list of strings giving the names of the columns to return
         """
-        sql = "SELECT %s FROM %s WHERE %s" % (
+        sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % (
             ", ".join(retcols),
             table,
             " AND ".join("%s = ?" % (k, ) for k in keyvalues)
@@ -334,7 +337,7 @@ class SQLBaseStore(object):
                                  retcols=None, allow_none=False):
         """ Combined SELECT then UPDATE."""
         if retcols:
-            select_sql = "SELECT %s FROM %s WHERE %s" % (
+            select_sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % (
                 ", ".join(retcols),
                 table,
                 " AND ".join("%s = ?" % (k) for k in keyvalues)
@@ -461,7 +464,7 @@ class SQLBaseStore(object):
     def _get_events_txn(self, txn, event_ids):
         # FIXME (erikj): This should be batched?
 
-        sql = "SELECT * FROM events WHERE event_id = ?"
+        sql = "SELECT * FROM events WHERE event_id = ? ORDER BY rowid asc"
 
         event_rows = []
         for e_id in event_ids:
@@ -478,7 +481,9 @@ class SQLBaseStore(object):
     def _parse_events_txn(self, txn, rows):
         events = [self._parse_event_from_row(r) for r in rows]
 
-        select_event_sql = "SELECT * FROM events WHERE event_id = ?"
+        select_event_sql = (
+            "SELECT * FROM events WHERE event_id = ? ORDER BY rowid asc"
+        )
 
         for i, ev in enumerate(events):
             signatures = self._get_event_signatures_txn(
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 1f89d77344..4d15005c9e 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -75,7 +75,9 @@ class RegistrationStore(SQLBaseStore):
                         "VALUES (?,?,?)",
                         [user_id, password_hash, now])
         except IntegrityError:
-            raise StoreError(400, "User ID already taken.", errcode=Codes.USER_IN_USE)
+            raise StoreError(
+                400, "User ID already taken.", errcode=Codes.USER_IN_USE
+            )
 
         # it's possible for this to get a conflict, but only for a single user
         # since tokens are namespaced based on their user ID
@@ -83,8 +85,8 @@ class RegistrationStore(SQLBaseStore):
                     "VALUES (?,?)", [txn.lastrowid, token])
 
     def get_user_by_id(self, user_id):
-        query = ("SELECT users.name, users.password_hash FROM users "
-                "WHERE users.name = ?")
+        query = ("SELECT users.name, users.password_hash FROM users"
+                 " WHERE users.name = ?")
         return self._execute(
             self.cursor_to_dict,
             query, user_id
@@ -120,10 +122,10 @@ class RegistrationStore(SQLBaseStore):
 
     def _query_for_auth(self, txn, token):
         sql = (
-            "SELECT users.name, users.admin, access_tokens.device_id "
-            "FROM users "
-            "INNER JOIN access_tokens on users.id = access_tokens.user_id "
-            "WHERE token = ?"
+            "SELECT users.name, users.admin, access_tokens.device_id"
+            " FROM users"
+            " INNER JOIN access_tokens on users.id = access_tokens.user_id"
+            " WHERE token = ?"
         )
 
         cursor = txn.execute(sql, (token,))
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index cc0513b8d2..2378d65943 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -27,7 +27,9 @@ import logging
 logger = logging.getLogger(__name__)
 
 
-OpsLevel = collections.namedtuple("OpsLevel", ("ban_level", "kick_level", "redact_level"))
+OpsLevel = collections.namedtuple("OpsLevel", (
+    "ban_level", "kick_level", "redact_level")
+)
 
 
 class RoomStore(SQLBaseStore):
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 93329703a2..c37df59d45 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -177,8 +177,8 @@ class RoomMemberStore(SQLBaseStore):
         return self._get_members_query(clause, vals)
 
     def _get_members_query(self, where_clause, where_values):
-        return self._db_pool.runInteraction(
-            self._get_members_query_txn,
+        return self.runInteraction(
+            "get_members_query", self._get_members_query_txn,
             where_clause, where_values
         )
 
diff --git a/synapse/storage/schema/delta/v8.sql b/synapse/storage/schema/delta/v8.sql
new file mode 100644
index 0000000000..daf6646ed5
--- /dev/null
+++ b/synapse/storage/schema/delta/v8.sql
@@ -0,0 +1,34 @@
+/* Copyright 2014 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ CREATE TABLE IF NOT EXISTS event_signatures_2 (
+    event_id TEXT,
+    signature_name TEXT,
+    key_id TEXT,
+    signature BLOB,
+    CONSTRAINT uniqueness UNIQUE (event_id, signature_name, key_id)
+);
+
+INSERT INTO event_signatures_2 (event_id, signature_name, key_id, signature)
+SELECT event_id, signature_name, key_id, signature FROM event_signatures;
+
+DROP TABLE event_signatures;
+ALTER TABLE event_signatures_2 RENAME TO event_signatures;
+
+CREATE INDEX IF NOT EXISTS event_signatures_id ON event_signatures (
+    event_id
+);
+
+PRAGMA user_version = 8;
\ No newline at end of file
diff --git a/synapse/storage/schema/event_signatures.sql b/synapse/storage/schema/event_signatures.sql
index 4efa8a3e63..b6b56b47a2 100644
--- a/synapse/storage/schema/event_signatures.sql
+++ b/synapse/storage/schema/event_signatures.sql
@@ -42,7 +42,7 @@ CREATE TABLE IF NOT EXISTS event_signatures (
     signature_name TEXT,
     key_id TEXT,
     signature BLOB,
-    CONSTRAINT uniqueness UNIQUE (event_id, key_id)
+    CONSTRAINT uniqueness UNIQUE (event_id, signature_name, key_id)
 );
 
 CREATE INDEX IF NOT EXISTS event_signatures_id ON event_signatures (
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index d90e08fff1..eea4f21065 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -36,7 +36,7 @@ class SignatureStore(SQLBaseStore):
         return dict(txn.fetchall())
 
     def _store_event_content_hash_txn(self, txn, event_id, algorithm,
-                                    hash_bytes):
+                                      hash_bytes):
         """Store a hash for a Event
         Args:
             txn (cursor):
@@ -84,7 +84,7 @@ class SignatureStore(SQLBaseStore):
         return dict(txn.fetchall())
 
     def _store_event_reference_hash_txn(self, txn, event_id, algorithm,
-                                      hash_bytes):
+                                        hash_bytes):
         """Store a hash for a PDU
         Args:
             txn (cursor):
@@ -127,7 +127,7 @@ class SignatureStore(SQLBaseStore):
         return res
 
     def _store_event_signature_txn(self, txn, event_id, signature_name, key_id,
-                                          signature_bytes):
+                                   signature_bytes):
         """Store a signature from the origin server for a PDU.
         Args:
             txn (cursor):
@@ -169,7 +169,7 @@ class SignatureStore(SQLBaseStore):
         return results
 
     def _store_prev_event_hash_txn(self, txn, event_id, prev_event_id,
-                                 algorithm, hash_bytes):
+                                   algorithm, hash_bytes):
         self._simple_insert_txn(
             txn,
             "event_edge_hashes",
@@ -180,4 +180,4 @@ class SignatureStore(SQLBaseStore):
                 "hash": buffer(hash_bytes),
             },
             or_ignore=True,
-        )
\ No newline at end of file
+        )
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 55ea567793..e0f44b3e59 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -87,7 +87,7 @@ class StateStore(SQLBaseStore):
         )
 
     def _store_state_groups_txn(self, txn, event):
-        if not event.state_events:
+        if event.state_events is None:
             return
 
         state_group = event.state_group
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index a954024678..b84735e61c 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -213,8 +213,8 @@ class StreamStore(SQLBaseStore):
         # Tokens really represent positions between elements, but we use
         # the convention of pointing to the event before the gap. Hence
         # we have a bit of asymmetry when it comes to equalities.
-        from_comp = '<=' if direction =='b' else '>'
-        to_comp = '>' if direction =='b' else '<='
+        from_comp = '<=' if direction == 'b' else '>'
+        to_comp = '>' if direction == 'b' else '<='
         order = "DESC" if direction == 'b' else "ASC"
 
         args = [room_id]
@@ -235,9 +235,10 @@ class StreamStore(SQLBaseStore):
         )
 
         sql = (
-            "SELECT *, (%(redacted)s) AS redacted FROM events "
-            "WHERE outlier = 0 AND room_id = ? AND %(bounds)s "
-            "ORDER BY topological_ordering %(order)s, stream_ordering %(order)s %(limit)s "
+            "SELECT *, (%(redacted)s) AS redacted FROM events"
+            " WHERE outlier = 0 AND room_id = ? AND %(bounds)s"
+            " ORDER BY topological_ordering %(order)s,"
+            " stream_ordering %(order)s %(limit)s"
         ) % {
             "redacted": del_sql,
             "bounds": bounds,