summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorDavid Baker <dave@matrix.org>2015-05-07 09:33:42 +0100
committerDavid Baker <dave@matrix.org>2015-05-07 09:33:42 +0100
commit97a64f3ebeaac9d9e97699f41751e272fc0c1c73 (patch)
tree30369edfc0af5da6f41f32932936ec363e844787 /synapse/storage
parentTypo (diff)
parentOptional profiling using cProfile (diff)
downloadsynapse-97a64f3ebeaac9d9e97699f41751e272fc0c1c73.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into develop
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/_base.py134
-rw-r--r--synapse/storage/engines/postgres.py3
-rw-r--r--synapse/storage/engines/sqlite3.py3
-rw-r--r--synapse/storage/event_federation.py40
-rw-r--r--synapse/storage/events.py76
-rw-r--r--synapse/storage/roommember.py4
-rw-r--r--synapse/storage/schema/delta/17/drop_indexes.sql18
-rw-r--r--synapse/storage/state.py16
-rw-r--r--synapse/storage/transactions.py6
9 files changed, 198 insertions, 102 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index c328b5274c..ee5587c721 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -31,7 +31,9 @@ import functools
 import simplejson as json
 import sys
 import time
+import threading
 
+DEBUG_CACHES = False
 
 logger = logging.getLogger(__name__)
 
@@ -68,9 +70,20 @@ class Cache(object):
 
         self.name = name
         self.keylen = keylen
-
+        self.sequence = 0
+        self.thread = None
         caches_by_name[name] = self.cache
 
+    def check_thread(self):
+        expected_thread = self.thread
+        if expected_thread is None:
+            self.thread = threading.current_thread()
+        else:
+            if expected_thread is not threading.current_thread():
+                raise ValueError(
+                    "Cache objects can only be accessed from the main thread"
+                )
+
     def get(self, *keyargs):
         if len(keyargs) != self.keylen:
             raise ValueError("Expected a key to have %d items", self.keylen)
@@ -82,6 +95,13 @@ class Cache(object):
         cache_counter.inc_misses(self.name)
         raise KeyError()
 
+    def update(self, sequence, *args):
+        self.check_thread()
+        if self.sequence == sequence:
+            # Only update the cache if the caches sequence number matches the
+            # number that the cache had before the SELECT was started (SYN-369)
+            self.prefill(*args)
+
     def prefill(self, *args):  # because I can't  *keyargs, value
         keyargs = args[:-1]
         value = args[-1]
@@ -96,9 +116,12 @@ class Cache(object):
         self.cache[keyargs] = value
 
     def invalidate(self, *keyargs):
+        self.check_thread()
         if len(keyargs) != self.keylen:
             raise ValueError("Expected a key to have %d items", self.keylen)
-
+        # Increment the sequence number so that any SELECT statements that
+        # raced with the INSERT don't update the cache (SYN-369)
+        self.sequence += 1
         self.cache.pop(keyargs, None)
 
 
@@ -128,11 +151,26 @@ def cached(max_entries=1000, num_args=1, lru=False):
         @defer.inlineCallbacks
         def wrapped(self, *keyargs):
             try:
-                defer.returnValue(cache.get(*keyargs))
+                cached_result = cache.get(*keyargs)
+                if DEBUG_CACHES:
+                    actual_result = yield orig(self, *keyargs)
+                    if actual_result != cached_result:
+                        logger.error(
+                            "Stale cache entry %s%r: cached: %r, actual %r",
+                            orig.__name__, keyargs,
+                            cached_result, actual_result,
+                        )
+                        raise ValueError("Stale cache entry")
+                defer.returnValue(cached_result)
             except KeyError:
+                # Get the sequence number of the cache before reading from the
+                # database so that we can tell if the cache is invalidated
+                # while the SELECT is executing (SYN-369)
+                sequence = cache.sequence
+
                 ret = yield orig(self, *keyargs)
 
-                cache.prefill(*keyargs + (ret,))
+                cache.update(sequence, *keyargs + (ret,))
 
                 defer.returnValue(ret)
 
@@ -147,12 +185,20 @@ class LoggingTransaction(object):
     """An object that almost-transparently proxies for the 'txn' object
     passed to the constructor. Adds logging and metrics to the .execute()
     method."""
-    __slots__ = ["txn", "name", "database_engine"]
+    __slots__ = ["txn", "name", "database_engine", "after_callbacks"]
 
-    def __init__(self, txn, name, database_engine):
+    def __init__(self, txn, name, database_engine, after_callbacks):
         object.__setattr__(self, "txn", txn)
         object.__setattr__(self, "name", name)
         object.__setattr__(self, "database_engine", database_engine)
+        object.__setattr__(self, "after_callbacks", after_callbacks)
+
+    def call_after(self, callback, *args):
+        """Call the given callback on the main twisted thread after the
+        transaction has finished. Used to invalidate the caches on the
+        correct thread.
+        """
+        self.after_callbacks.append((callback, args))
 
     def __getattr__(self, name):
         return getattr(self.txn, name)
@@ -160,22 +206,23 @@ class LoggingTransaction(object):
     def __setattr__(self, name, value):
         setattr(self.txn, name, value)
 
-    def execute(self, sql, *args, **kwargs):
+    def execute(self, sql, *args):
+        self._do_execute(self.txn.execute, sql, *args)
+
+    def executemany(self, sql, *args):
+        self._do_execute(self.txn.executemany, sql, *args)
+
+    def _do_execute(self, func, sql, *args):
         # TODO(paul): Maybe use 'info' and 'debug' for values?
         sql_logger.debug("[SQL] {%s} %s", self.name, sql)
 
         sql = self.database_engine.convert_param_style(sql)
 
-        if args and args[0]:
-            args = list(args)
-            args[0] = [
-                self.database_engine.encode_parameter(a) for a in args[0]
-            ]
+        if args:
             try:
                 sql_logger.debug(
-                    "[SQL values] {%s} " + ", ".join(("<%r>",) * len(args[0])),
-                    self.name,
-                    *args[0]
+                    "[SQL values] {%s} %r",
+                    self.name, args[0]
                 )
             except:
                 # Don't let logging failures stop SQL from working
@@ -184,8 +231,8 @@ class LoggingTransaction(object):
         start = time.time() * 1000
 
         try:
-            return self.txn.execute(
-                sql, *args, **kwargs
+            return func(
+                sql, *args
             )
         except Exception as e:
             logger.debug("[SQL FAIL] {%s} %s", self.name, e)
@@ -298,6 +345,8 @@ class SQLBaseStore(object):
 
         start_time = time.time() * 1000
 
+        after_callbacks = []
+
         def inner_func(conn, *args, **kwargs):
             with LoggingContext("runInteraction") as context:
                 if self.database_engine.is_connection_closed(conn):
@@ -322,10 +371,10 @@ class SQLBaseStore(object):
                     while True:
                         try:
                             txn = conn.cursor()
-                            return func(
-                                LoggingTransaction(txn, name, self.database_engine),
-                                *args, **kwargs
+                            txn = LoggingTransaction(
+                                txn, name, self.database_engine, after_callbacks
                             )
+                            return func(txn, *args, **kwargs)
                         except self.database_engine.module.OperationalError as e:
                             # This can happen if the database disappears mid
                             # transaction.
@@ -374,6 +423,8 @@ class SQLBaseStore(object):
             result = yield self._db_pool.runWithConnection(
                 inner_func, *args, **kwargs
             )
+        for after_callback, after_args in after_callbacks:
+            after_callback(*after_args)
         defer.returnValue(result)
 
     def cursor_to_dict(self, cursor):
@@ -438,18 +489,49 @@ class SQLBaseStore(object):
 
     @log_function
     def _simple_insert_txn(self, txn, table, values):
+        keys, vals = zip(*values.items())
+
         sql = "INSERT INTO %s (%s) VALUES(%s)" % (
             table,
-            ", ".join(k for k in values),
-            ", ".join("?" for k in values)
+            ", ".join(k for k in keys),
+            ", ".join("?" for _ in keys)
         )
 
-        logger.debug(
-            "[SQL] %s Args=%s",
-            sql, values.values(),
+        txn.execute(sql, vals)
+
+    def _simple_insert_many_txn(self, txn, table, values):
+        if not values:
+            return
+
+        # This is a *slight* abomination to get a list of tuples of key names
+        # and a list of tuples of value names.
+        #
+        # i.e. [{"a": 1, "b": 2}, {"c": 3, "d": 4}]
+        #         => [("a", "b",), ("c", "d",)] and [(1, 2,), (3, 4,)]
+        #
+        # The sort is to ensure that we don't rely on dictionary iteration
+        # order.
+        keys, vals = zip(*[
+            zip(
+                *(sorted(i.items(), key=lambda kv: kv[0]))
+            )
+            for i in values
+            if i
+        ])
+
+        for k in keys:
+            if k != keys[0]:
+                raise RuntimeError(
+                    "All items must have the same keys"
+                )
+
+        sql = "INSERT INTO %s (%s) VALUES(%s)" % (
+            table,
+            ", ".join(k for k in keys[0]),
+            ", ".join("?" for _ in keys[0])
         )
 
-        txn.execute(sql, values.values())
+        txn.executemany(sql, vals)
 
     def _simple_upsert(self, table, keyvalues, values,
                        insertion_values={}, desc="_simple_upsert", lock=True):
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 64e34265f6..a323028546 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -36,9 +36,6 @@ class PostgresEngine(object):
     def convert_param_style(self, sql):
         return sql.replace("?", "%s")
 
-    def encode_parameter(self, param):
-        return param
-
     def on_new_connection(self, db_conn):
         db_conn.set_isolation_level(
             self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ
diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite3.py
index 7b49157cbd..ff13d8006a 100644
--- a/synapse/storage/engines/sqlite3.py
+++ b/synapse/storage/engines/sqlite3.py
@@ -26,9 +26,6 @@ class Sqlite3Engine(object):
     def convert_param_style(self, sql):
         return sql
 
-    def encode_parameter(self, param):
-        return param
-
     def on_new_connection(self, db_conn):
         self.prepare_database(db_conn)
 
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index fbbcce754b..74b4e23590 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -104,7 +104,7 @@ class EventFederationStore(SQLBaseStore):
                 "room_id": room_id,
             },
             retcol="event_id",
-            desc="get_latest_events_in_room",
+            desc="get_latest_event_ids_in_room",
         )
 
     def _get_latest_events_in_room(self, txn, room_id):
@@ -262,18 +262,19 @@ class EventFederationStore(SQLBaseStore):
         For the given event, update the event edges table and forward and
         backward extremities tables.
         """
-        for e_id, _ in prev_events:
-            # TODO (erikj): This could be done as a bulk insert
-            self._simple_insert_txn(
-                txn,
-                table="event_edges",
-                values={
+        self._simple_insert_many_txn(
+            txn,
+            table="event_edges",
+            values=[
+                {
                     "event_id": event_id,
                     "prev_event_id": e_id,
                     "room_id": room_id,
                     "is_state": False,
-                },
-            )
+                }
+                for e_id, _ in prev_events
+            ],
+        )
 
         # Update the extremities table if this is not an outlier.
         if not outlier:
@@ -307,16 +308,17 @@ class EventFederationStore(SQLBaseStore):
 
             # Insert all the prev_events as a backwards thing, they'll get
             # deleted in a second if they're incorrect anyway.
-            for e_id, _ in prev_events:
-                # TODO (erikj): This could be done as a bulk insert
-                self._simple_insert_txn(
-                    txn,
-                    table="event_backward_extremities",
-                    values={
+            self._simple_insert_many_txn(
+                txn,
+                table="event_backward_extremities",
+                values=[
+                    {
                         "event_id": e_id,
                         "room_id": room_id,
-                    },
-                )
+                    }
+                    for e_id, _ in prev_events
+                ],
+            )
 
             # Also delete from the backwards extremities table all ones that
             # reference events that we have already seen
@@ -330,7 +332,9 @@ class EventFederationStore(SQLBaseStore):
             )
             txn.execute(query)
 
-            self.get_latest_event_ids_in_room.invalidate(room_id)
+            txn.call_after(
+                self.get_latest_event_ids_in_room.invalidate, room_id
+            )
 
     def get_backfill_events(self, room_id, event_list, limit):
         """Get a list of Events for a given topic that occurred before (and
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index a3c260ddc4..38395c66ab 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -93,7 +93,7 @@ class EventsStore(SQLBaseStore):
                            current_state=None):
 
         # Remove the any existing cache entries for the event_id
-        self._invalidate_get_event_cache(event.event_id)
+        txn.call_after(self._invalidate_get_event_cache, event.event_id)
 
         if stream_ordering is None:
             with self._stream_id_gen.get_next_txn(txn) as stream_ordering:
@@ -114,6 +114,13 @@ class EventsStore(SQLBaseStore):
             )
 
             for s in current_state:
+                if s.type == EventTypes.Member:
+                    txn.call_after(
+                        self.get_rooms_for_user.invalidate, s.state_key
+                    )
+                    txn.call_after(
+                        self.get_joined_hosts_for_room.invalidate, s.room_id
+                    )
                 self._simple_insert_txn(
                     txn,
                     "current_state_events",
@@ -122,31 +129,9 @@ class EventsStore(SQLBaseStore):
                         "room_id": s.room_id,
                         "type": s.type,
                         "state_key": s.state_key,
-                    },
-                )
-
-        if event.is_state() and is_new_state:
-            if not backfilled and not context.rejected:
-                self._simple_insert_txn(
-                    txn,
-                    table="state_forward_extremities",
-                    values={
-                        "event_id": event.event_id,
-                        "room_id": event.room_id,
-                        "type": event.type,
-                        "state_key": event.state_key,
-                    },
+                    }
                 )
 
-                for prev_state_id, _ in event.prev_state:
-                    self._simple_delete_txn(
-                        txn,
-                        table="state_forward_extremities",
-                        keyvalues={
-                            "event_id": prev_state_id,
-                        }
-                    )
-
         outlier = event.internal_metadata.is_outlier()
 
         if not outlier:
@@ -281,7 +266,9 @@ class EventsStore(SQLBaseStore):
         )
 
         if context.rejected:
-            self._store_rejections_txn(txn, event.event_id, context.rejected)
+            self._store_rejections_txn(
+                txn, event.event_id, context.rejected
+            )
 
         for hash_alg, hash_base64 in event.hashes.items():
             hash_bytes = decode_base64(hash_base64)
@@ -293,19 +280,22 @@ class EventsStore(SQLBaseStore):
             for alg, hash_base64 in prev_hashes.items():
                 hash_bytes = decode_base64(hash_base64)
                 self._store_prev_event_hash_txn(
-                    txn, event.event_id, prev_event_id, alg, hash_bytes
+                    txn, event.event_id, prev_event_id, alg,
+                    hash_bytes
                 )
 
-        for auth_id, _ in event.auth_events:
-            self._simple_insert_txn(
-                txn,
-                table="event_auth",
-                values={
+        self._simple_insert_many_txn(
+            txn,
+            table="event_auth",
+            values=[
+                {
                     "event_id": event.event_id,
                     "room_id": event.room_id,
                     "auth_id": auth_id,
-                },
-            )
+                }
+                for auth_id, _ in event.auth_events
+            ],
+        )
 
         (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event)
         self._store_event_reference_hash_txn(
@@ -330,17 +320,19 @@ class EventsStore(SQLBaseStore):
                 vals,
             )
 
-            for e_id, h in event.prev_state:
-                self._simple_insert_txn(
-                    txn,
-                    table="event_edges",
-                    values={
+            self._simple_insert_many_txn(
+                txn,
+                table="event_edges",
+                values=[
+                    {
                         "event_id": event.event_id,
                         "prev_event_id": e_id,
                         "room_id": event.room_id,
                         "is_state": True,
-                    },
-                )
+                    }
+                    for e_id, h in event.prev_state
+                ],
+            )
 
             if is_new_state and not context.rejected:
                 self._simple_upsert_txn(
@@ -356,9 +348,11 @@ class EventsStore(SQLBaseStore):
                     }
                 )
 
+        return
+
     def _store_redaction(self, txn, event):
         # invalidate the cache for the redacted event
-        self._invalidate_get_event_cache(event.redacts)
+        txn.call_after(self._invalidate_get_event_cache, event.redacts)
         txn.execute(
             "INSERT INTO redactions (event_id, redacts) VALUES (?,?)",
             (event.event_id, event.redacts)
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 09fb77a194..839c74f63a 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -64,8 +64,8 @@ class RoomMemberStore(SQLBaseStore):
             }
         )
 
-        self.get_rooms_for_user.invalidate(target_user_id)
-        self.get_joined_hosts_for_room.invalidate(event.room_id)
+        txn.call_after(self.get_rooms_for_user.invalidate, target_user_id)
+        txn.call_after(self.get_joined_hosts_for_room.invalidate, event.room_id)
 
     def get_room_member(self, user_id, room_id):
         """Retrieve the current state of a room member.
diff --git a/synapse/storage/schema/delta/17/drop_indexes.sql b/synapse/storage/schema/delta/17/drop_indexes.sql
new file mode 100644
index 0000000000..8eb3325a6b
--- /dev/null
+++ b/synapse/storage/schema/delta/17/drop_indexes.sql
@@ -0,0 +1,18 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+DROP INDEX IF EXISTS sent_transaction_dest;
+DROP INDEX IF EXISTS sent_transaction_sent;
+DROP INDEX IF EXISTS user_ips_user;
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 7e55e8bed6..dbc0e49c1f 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -104,18 +104,20 @@ class StateStore(SQLBaseStore):
                 },
             )
 
-            for state in state_events.values():
-                self._simple_insert_txn(
-                    txn,
-                    table="state_groups_state",
-                    values={
+            self._simple_insert_many_txn(
+                txn,
+                table="state_groups_state",
+                values=[
+                    {
                         "state_group": state_group,
                         "room_id": state.room_id,
                         "type": state.type,
                         "state_key": state.state_key,
                         "event_id": state.event_id,
-                    },
-                )
+                    }
+                    for state in state_events.values()
+                ],
+            )
 
         self._simple_insert_txn(
             txn,
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index 89dd7d8947..624da4a9dc 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -17,6 +17,7 @@ from ._base import SQLBaseStore, cached
 
 from collections import namedtuple
 
+from syutil.jsonutil import encode_canonical_json
 import logging
 
 logger = logging.getLogger(__name__)
@@ -82,7 +83,7 @@ class TransactionStore(SQLBaseStore):
                 "transaction_id": transaction_id,
                 "origin": origin,
                 "response_code": code,
-                "response_json": response_dict,
+                "response_json": buffer(encode_canonical_json(response_dict)),
             },
             or_ignore=True,
             desc="set_received_txn_response",
@@ -161,7 +162,8 @@ class TransactionStore(SQLBaseStore):
         return self.runInteraction(
             "delivered_txn",
             self._delivered_txn,
-            transaction_id, destination, code, response_dict
+            transaction_id, destination, code,
+            buffer(encode_canonical_json(response_dict)),
         )
 
     def _delivered_txn(self, txn, transaction_id, destination,