summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorDavid Baker <dave@matrix.org>2015-01-13 13:15:51 +0000
committerDavid Baker <dave@matrix.org>2015-01-13 13:15:51 +0000
commitc06a9063e1d838f776edfd79cfc8ab29c748d794 (patch)
treebcfa472e65d4dacbab666d5787eff9293e5ccc41 /synapse/storage
parentSplit out function to decide whether to notify or a given event (diff)
parentMerge branch 'hotfixes-v0.6.1b' of github.com:matrix-org/synapse into develop (diff)
downloadsynapse-c06a9063e1d838f776edfd79cfc8ab29c748d794.tar.xz
Merge branch 'develop' into pushers
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py8
-rw-r--r--synapse/storage/_base.py52
-rw-r--r--synapse/storage/directory.py2
-rw-r--r--synapse/storage/event_federation.py26
-rw-r--r--synapse/storage/feedback.py2
-rw-r--r--synapse/storage/keys.py2
-rw-r--r--synapse/storage/media_repository.py2
-rw-r--r--synapse/storage/presence.py2
-rw-r--r--synapse/storage/profile.py2
-rw-r--r--synapse/storage/registration.py2
-rw-r--r--synapse/storage/room.py10
-rw-r--r--synapse/storage/roommember.py93
-rw-r--r--synapse/storage/schema/delta/v11.sql16
-rw-r--r--synapse/storage/schema/delta/v2.sql2
-rw-r--r--synapse/storage/schema/delta/v3.sql2
-rw-r--r--synapse/storage/schema/delta/v4.sql14
-rw-r--r--synapse/storage/schema/delta/v5.sql14
-rw-r--r--synapse/storage/schema/delta/v6.sql2
-rw-r--r--synapse/storage/schema/delta/v8.sql2
-rw-r--r--synapse/storage/schema/delta/v9.sql2
-rw-r--r--synapse/storage/schema/event_edges.sql14
-rw-r--r--synapse/storage/schema/event_signatures.sql2
-rw-r--r--synapse/storage/schema/im.sql2
-rw-r--r--synapse/storage/schema/keys.sql2
-rw-r--r--synapse/storage/schema/media_repository.sql2
-rw-r--r--synapse/storage/schema/presence.sql2
-rw-r--r--synapse/storage/schema/profiles.sql2
-rw-r--r--synapse/storage/schema/redactions.sql14
-rw-r--r--synapse/storage/schema/room_aliases.sql2
-rw-r--r--synapse/storage/schema/state.sql2
-rw-r--r--synapse/storage/schema/transactions.sql3
-rw-r--r--synapse/storage/schema/users.sql2
-rw-r--r--synapse/storage/signatures.py2
-rw-r--r--synapse/storage/state.py16
-rw-r--r--synapse/storage/stream.py151
-rw-r--r--synapse/storage/transactions.py2
36 files changed, 294 insertions, 183 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index ac3bf5cee5..fa7ad0eea8 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -1,5 +1,5 @@
 # -*- coding: utf-8 -*-
-# Copyright 2014 OpenMarket Ltd
+# Copyright 2014, 2015 OpenMarket Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -68,7 +68,7 @@ SCHEMAS = [
 
 # Remember to update this number every time an incompatible change is made to
 # database schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 10
+SCHEMA_VERSION = 11
 
 
 class _RollbackButIsFineException(Exception):
@@ -146,9 +146,7 @@ class DataStore(RoomMemberStore, RoomStore,
         elif event.type == EventTypes.Redaction:
             self._store_redaction(txn, event)
 
-        outlier = False
-        if hasattr(event.internal_metadata, "outlier"):
-            outlier = event.internal_metadata.outlier
+        outlier = event.internal_metadata.is_outlier()
 
         event_dict = {
             k: v
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index efb2664680..4f172d3967 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -1,5 +1,5 @@
 # -*- coding: utf-8 -*-
-# Copyright 2014 OpenMarket Ltd
+# Copyright 2014, 2015 OpenMarket Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -64,7 +64,7 @@ class LoggingTransaction(object):
             # Don't let logging failures stop SQL from working
             pass
 
-        start = time.clock() * 1000
+        start = time.time() * 1000
         try:
             return self.txn.execute(
                 sql, *args, **kwargs
@@ -73,7 +73,7 @@ class LoggingTransaction(object):
                 logger.exception("[SQL FAIL] {%s}", self.name)
                 raise
         finally:
-            end = time.clock() * 1000
+            end = time.time() * 1000
             sql_logger.debug("[SQL time] {%s} %f", self.name, end - start)
 
 
@@ -93,7 +93,7 @@ class SQLBaseStore(object):
         def inner_func(txn, *args, **kwargs):
             with LoggingContext("runInteraction") as context:
                 current_context.copy_to(context)
-                start = time.clock() * 1000
+                start = time.time() * 1000
                 txn_id = SQLBaseStore._TXN_ID
 
                 # We don't really need these to be unique, so lets stop it from
@@ -109,7 +109,7 @@ class SQLBaseStore(object):
                     logger.exception("[TXN FAIL] {%s}", name)
                     raise
                 finally:
-                    end = time.clock() * 1000
+                    end = time.time() * 1000
                     transaction_logger.debug(
                         "[TXN END] {%s} %f",
                         name, end - start
@@ -479,23 +479,31 @@ class SQLBaseStore(object):
 
         return self.runInteraction("_simple_max_id", func)
 
-    def _get_events(self, event_ids):
+    def _get_events(self, event_ids, check_redacted=True,
+                    get_prev_content=False):
         return self.runInteraction(
-            "_get_events", self._get_events_txn, event_ids
+            "_get_events", self._get_events_txn, event_ids,
+            check_redacted=check_redacted, get_prev_content=get_prev_content,
         )
 
-    def _get_events_txn(self, txn, event_ids):
-        events = []
-        for e_id in event_ids:
-            ev = self._get_event_txn(txn, e_id)
+    def _get_events_txn(self, txn, event_ids, check_redacted=True,
+                        get_prev_content=False):
+        if not event_ids:
+            return []
 
-            if ev:
-                events.append(ev)
+        events = [
+            self._get_event_txn(
+                txn, event_id,
+                check_redacted=check_redacted,
+                get_prev_content=get_prev_content
+            )
+            for event_id in event_ids
+        ]
 
-        return events
+        return [e for e in events if e]
 
     def _get_event_txn(self, txn, event_id, check_redacted=True,
-                       get_prev_content=True):
+                       get_prev_content=False):
         sql = (
             "SELECT internal_metadata, json, r.event_id FROM event_json as e "
             "LEFT JOIN redactions as r ON e.event_id = r.redacts "
@@ -512,6 +520,14 @@ class SQLBaseStore(object):
 
         internal_metadata, js, redacted = res
 
+        return self._get_event_from_row_txn(
+            txn, internal_metadata, js, redacted,
+            check_redacted=check_redacted,
+            get_prev_content=get_prev_content,
+        )
+
+    def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted,
+                                check_redacted=True, get_prev_content=False):
         d = json.loads(js)
         internal_metadata = json.loads(internal_metadata)
 
@@ -533,11 +549,13 @@ class SQLBaseStore(object):
                 ev.unsigned["redacted_because"] = because
 
         if get_prev_content and "replaces_state" in ev.unsigned:
-            ev.unsigned["prev_content"] = self._get_event_txn(
+            prev = self._get_event_txn(
                 txn,
                 ev.unsigned["replaces_state"],
                 get_prev_content=False,
-            ).get_dict()["content"]
+            )
+            if prev:
+                ev.unsigned["prev_content"] = prev.get_dict()["content"]
 
         return ev
 
diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py
index 2be9c41374..68b7d59693 100644
--- a/synapse/storage/directory.py
+++ b/synapse/storage/directory.py
@@ -1,5 +1,5 @@
 # -*- coding: utf-8 -*-
-# Copyright 2014 OpenMarket Ltd
+# Copyright 2014, 2015 OpenMarket Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index ced066f407..0cbcdd1b55 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -1,5 +1,5 @@
 # -*- coding: utf-8 -*-
-# Copyright 2014 OpenMarket Ltd
+# Copyright 2014, 2015 OpenMarket Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -32,39 +32,33 @@ class EventFederationStore(SQLBaseStore):
     and backfilling from another server respectively.
     """
 
-    def get_auth_chain(self, event_id):
+    def get_auth_chain(self, event_ids):
         return self.runInteraction(
             "get_auth_chain",
             self._get_auth_chain_txn,
-            event_id
+            event_ids
         )
 
-    def _get_auth_chain_txn(self, txn, event_id):
-        results = self._get_auth_chain_ids_txn(txn, event_id)
+    def _get_auth_chain_txn(self, txn, event_ids):
+        results = self._get_auth_chain_ids_txn(txn, event_ids)
 
-        sql = "SELECT * FROM events WHERE event_id = ?"
-        rows = []
-        for ev_id in results:
-            c = txn.execute(sql, (ev_id,))
-            rows.extend(self.cursor_to_dict(c))
+        return self._get_events_txn(txn, results)
 
-        return self._parse_events_txn(txn, rows)
-
-    def get_auth_chain_ids(self, event_id):
+    def get_auth_chain_ids(self, event_ids):
         return self.runInteraction(
             "get_auth_chain_ids",
             self._get_auth_chain_ids_txn,
-            event_id
+            event_ids
         )
 
-    def _get_auth_chain_ids_txn(self, txn, event_id):
+    def _get_auth_chain_ids_txn(self, txn, event_ids):
         results = set()
 
         base_sql = (
             "SELECT auth_id FROM event_auth WHERE %s"
         )
 
-        front = set([event_id])
+        front = set(event_ids)
         while front:
             sql = base_sql % (
                 " OR ".join(["event_id=?"] * len(front)),
diff --git a/synapse/storage/feedback.py b/synapse/storage/feedback.py
index 21511577c5..fcf011b234 100644
--- a/synapse/storage/feedback.py
+++ b/synapse/storage/feedback.py
@@ -1,5 +1,5 @@
 # -*- coding: utf-8 -*-
-# Copyright 2014 OpenMarket Ltd
+# Copyright 2014, 2015 OpenMarket Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index fd705138e6..1f244019fc 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -1,5 +1,5 @@
 # -*- coding: utf-8 -*-
-# Copyright 2014 OpenMarket Ltd
+# Copyright 2014, 2015 OpenMarket Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py
index 18c068d3d9..7101d2beec 100644
--- a/synapse/storage/media_repository.py
+++ b/synapse/storage/media_repository.py
@@ -1,5 +1,5 @@
 # -*- coding: utf-8 -*-
-# Copyright 2014 OpenMarket Ltd
+# Copyright 2014, 2015 OpenMarket Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index 71b2bb084d..1dcd34723b 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -1,5 +1,5 @@
 # -*- coding: utf-8 -*-
-# Copyright 2014 OpenMarket Ltd
+# Copyright 2014, 2015 OpenMarket Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py
index 7e1fdd9d88..153c7ad027 100644
--- a/synapse/storage/profile.py
+++ b/synapse/storage/profile.py
@@ -1,5 +1,5 @@
 # -*- coding: utf-8 -*-
-# Copyright 2014 OpenMarket Ltd
+# Copyright 2014, 2015 OpenMarket Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 4d15005c9e..75dffa4db2 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -1,5 +1,5 @@
 # -*- coding: utf-8 -*-
-# Copyright 2014 OpenMarket Ltd
+# Copyright 2014, 2015 OpenMarket Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 2378d65943..978b2c4a48 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -1,5 +1,5 @@
 # -*- coding: utf-8 -*-
-# Copyright 2014 OpenMarket Ltd
+# Copyright 2014, 2015 OpenMarket Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -135,26 +135,26 @@ class RoomStore(SQLBaseStore):
         defer.returnValue(ret)
 
     def _store_room_topic_txn(self, txn, event):
-        if hasattr(event, "topic"):
+        if hasattr(event, "content") and "topic" in event.content:
             self._simple_insert_txn(
                 txn,
                 "topics",
                 {
                     "event_id": event.event_id,
                     "room_id": event.room_id,
-                    "topic": event.topic,
+                    "topic": event.content["topic"],
                 }
             )
 
     def _store_room_name_txn(self, txn, event):
-        if hasattr(event, "name"):
+        if hasattr(event, "content") and "name" in event.content:
             self._simple_insert_txn(
                 txn,
                 "room_names",
                 {
                     "event_id": event.event_id,
                     "room_id": event.room_id,
-                    "name": event.name,
+                    "name": event.content["name"],
                 }
             )
 
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 05b275663e..e59e65529b 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -1,5 +1,5 @@
 # -*- coding: utf-8 -*-
-# Copyright 2014 OpenMarket Ltd
+# Copyright 2014, 2015 OpenMarket Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -15,6 +15,8 @@
 
 from twisted.internet import defer
 
+from collections import namedtuple
+
 from ._base import SQLBaseStore
 
 from synapse.api.constants import Membership
@@ -24,6 +26,12 @@ import logging
 logger = logging.getLogger(__name__)
 
 
+RoomsForUser = namedtuple(
+    "RoomsForUser",
+    ("room_id", "sender", "membership")
+)
+
+
 class RoomMemberStore(SQLBaseStore):
 
     def _store_room_member_txn(self, txn, event):
@@ -123,6 +131,19 @@ class RoomMemberStore(SQLBaseStore):
         else:
             return None
 
+    def get_users_in_room(self, room_id):
+        def f(txn):
+            sql = (
+                "SELECT m.user_id FROM room_memberships as m"
+                " INNER JOIN current_state_events as c"
+                " ON m.event_id = c.event_id"
+                " WHERE m.membership = ? AND m.room_id = ?"
+            )
+
+            txn.execute(sql, (Membership.JOIN, room_id))
+            return [r[0] for r in txn.fetchall()]
+        return self.runInteraction("get_users_in_room", f)
+
     def get_room_members(self, room_id, membership=None):
         """Retrieve the current room member list for a room.
 
@@ -150,19 +171,37 @@ class RoomMemberStore(SQLBaseStore):
             membership_list (list): A list of synapse.api.constants.Membership
             values which the user must be in.
         Returns:
-            A list of RoomMemberEvent objects
+            A list of dictionary objects, with room_id, membership and sender
+            defined.
         """
         if not membership_list:
             return defer.succeed(None)
 
-        args = [user_id]
-        args.extend(membership_list)
-
         where_clause = "user_id = ? AND (%s)" % (
             " OR ".join(["membership = ?" for _ in membership_list]),
         )
 
-        return self._get_members_query(where_clause, args)
+        args = [user_id]
+        args.extend(membership_list)
+
+        def f(txn):
+            sql = (
+                "SELECT m.room_id, m.sender, m.membership"
+                " FROM room_memberships as m"
+                " INNER JOIN current_state_events as c"
+                " ON m.event_id = c.event_id"
+                " WHERE %s"
+            ) % (where_clause,)
+
+            txn.execute(sql, args)
+            return [
+                RoomsForUser(**r) for r in self.cursor_to_dict(txn)
+            ]
+
+        return self.runInteraction(
+            "get_rooms_for_user_where_membership_is",
+            f
+        )
 
     def get_joined_hosts_for_room(self, room_id):
         return self._simple_select_onecol(
@@ -183,20 +222,14 @@ class RoomMemberStore(SQLBaseStore):
         )
 
     def _get_members_query_txn(self, txn, where_clause, where_values):
-        del_sql = (
-            "SELECT event_id FROM redactions WHERE redacts = e.event_id "
-            "LIMIT 1"
-        )
-
         sql = (
-            "SELECT e.*, (%(redacted)s) AS redacted FROM events as e "
+            "SELECT e.* FROM events as e "
             "INNER JOIN room_memberships as m "
             "ON e.event_id = m.event_id "
             "INNER JOIN current_state_events as c "
             "ON m.event_id = c.event_id "
             "WHERE %(where)s "
         ) % {
-            "redacted": del_sql,
             "where": where_clause,
         }
 
@@ -206,26 +239,28 @@ class RoomMemberStore(SQLBaseStore):
         results = self._parse_events_txn(txn, rows)
         return results
 
-    @defer.inlineCallbacks
     def user_rooms_intersect(self, user_id_list):
         """ Checks whether all the users whose IDs are given in a list share a
         room.
         """
-        user_list_clause = " OR ".join(["m.user_id = ?"] * len(user_id_list))
-        sql = (
-            "SELECT m.room_id FROM room_memberships as m "
-            "INNER JOIN current_state_events as c "
-            "ON m.event_id = c.event_id "
-            "WHERE m.membership = 'join' "
-            "AND (%(clause)s) "
-            # TODO(paul): We've got duplicate rows in the database somewhere
-            #   so we have to DISTINCT m.user_id here
-            "GROUP BY m.room_id HAVING COUNT(DISTINCT m.user_id) = ?"
-        ) % {"clause": user_list_clause}
+        def interaction(txn):
+            user_list_clause = " OR ".join(["m.user_id = ?"] * len(user_id_list))
+            sql = (
+                "SELECT m.room_id FROM room_memberships as m "
+                "INNER JOIN current_state_events as c "
+                "ON m.event_id = c.event_id "
+                "WHERE m.membership = 'join' "
+                "AND (%(clause)s) "
+                # TODO(paul): We've got duplicate rows in the database somewhere
+                #   so we have to DISTINCT m.user_id here
+                "GROUP BY m.room_id HAVING COUNT(DISTINCT m.user_id) = ?"
+            ) % {"clause": user_list_clause}
+
+            args = list(user_id_list)
+            args.append(len(user_id_list))
 
-        args = list(user_id_list)
-        args.append(len(user_id_list))
+            txn.execute(sql, args)
 
-        rows = yield self._execute(None, sql, *args)
+            return len(txn.fetchall()) > 0
 
-        defer.returnValue(len(rows) > 0)
+        return self.runInteraction("user_rooms_intersect", interaction)
diff --git a/synapse/storage/schema/delta/v11.sql b/synapse/storage/schema/delta/v11.sql
new file mode 100644
index 0000000000..313592221b
--- /dev/null
+++ b/synapse/storage/schema/delta/v11.sql
@@ -0,0 +1,16 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE INDEX IF NOT EXISTS sent_transaction_txn_id ON sent_transactions(transaction_id);
\ No newline at end of file
diff --git a/synapse/storage/schema/delta/v2.sql b/synapse/storage/schema/delta/v2.sql
index 73b140465e..f740f6dd5d 100644
--- a/synapse/storage/schema/delta/v2.sql
+++ b/synapse/storage/schema/delta/v2.sql
@@ -1,4 +1,4 @@
-/* Copyright 2014 OpenMarket Ltd
+/* Copyright 2014, 2015 OpenMarket Ltd
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
diff --git a/synapse/storage/schema/delta/v3.sql b/synapse/storage/schema/delta/v3.sql
index cade295989..c67e38ff52 100644
--- a/synapse/storage/schema/delta/v3.sql
+++ b/synapse/storage/schema/delta/v3.sql
@@ -1,4 +1,4 @@
-/* Copyright 2014 OpenMarket Ltd
+/* Copyright 2014, 2015 OpenMarket Ltd
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
diff --git a/synapse/storage/schema/delta/v4.sql b/synapse/storage/schema/delta/v4.sql
index 25d2ead450..d3807b7686 100644
--- a/synapse/storage/schema/delta/v4.sql
+++ b/synapse/storage/schema/delta/v4.sql
@@ -1,3 +1,17 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 CREATE TABLE IF NOT EXISTS redactions (
     event_id TEXT NOT NULL,
     redacts TEXT NOT NULL,
diff --git a/synapse/storage/schema/delta/v5.sql b/synapse/storage/schema/delta/v5.sql
index af9df11aa9..0874a15431 100644
--- a/synapse/storage/schema/delta/v5.sql
+++ b/synapse/storage/schema/delta/v5.sql
@@ -1,3 +1,17 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 CREATE TABLE IF NOT EXISTS user_ips (
     user TEXT NOT NULL,
diff --git a/synapse/storage/schema/delta/v6.sql b/synapse/storage/schema/delta/v6.sql
index 9bf2068d84..a9e0a4fe0d 100644
--- a/synapse/storage/schema/delta/v6.sql
+++ b/synapse/storage/schema/delta/v6.sql
@@ -1,4 +1,4 @@
-/* Copyright 2014 OpenMarket Ltd
+/* Copyright 2014, 2015 OpenMarket Ltd
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
diff --git a/synapse/storage/schema/delta/v8.sql b/synapse/storage/schema/delta/v8.sql
index daf6646ed5..1e9f8b18cb 100644
--- a/synapse/storage/schema/delta/v8.sql
+++ b/synapse/storage/schema/delta/v8.sql
@@ -1,4 +1,4 @@
-/* Copyright 2014 OpenMarket Ltd
+/* Copyright 2014, 2015 OpenMarket Ltd
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
diff --git a/synapse/storage/schema/delta/v9.sql b/synapse/storage/schema/delta/v9.sql
index 0af29733a0..455d51a70c 100644
--- a/synapse/storage/schema/delta/v9.sql
+++ b/synapse/storage/schema/delta/v9.sql
@@ -1,4 +1,4 @@
-/* Copyright 2014 OpenMarket Ltd
+/* Copyright 2014, 2015 OpenMarket Ltd
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
diff --git a/synapse/storage/schema/event_edges.sql b/synapse/storage/schema/event_edges.sql
index be1c72a775..1e766d6db2 100644
--- a/synapse/storage/schema/event_edges.sql
+++ b/synapse/storage/schema/event_edges.sql
@@ -1,3 +1,17 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 CREATE TABLE IF NOT EXISTS event_forward_extremities(
     event_id TEXT NOT NULL,
diff --git a/synapse/storage/schema/event_signatures.sql b/synapse/storage/schema/event_signatures.sql
index b6b56b47a2..c28c39c48a 100644
--- a/synapse/storage/schema/event_signatures.sql
+++ b/synapse/storage/schema/event_signatures.sql
@@ -1,4 +1,4 @@
-/* Copyright 2014 OpenMarket Ltd
+/* Copyright 2014, 2015 OpenMarket Ltd
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql
index 253f9f779b..dd00c1cd2f 100644
--- a/synapse/storage/schema/im.sql
+++ b/synapse/storage/schema/im.sql
@@ -1,4 +1,4 @@
-/* Copyright 2014 OpenMarket Ltd
+/* Copyright 2014, 2015 OpenMarket Ltd
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
diff --git a/synapse/storage/schema/keys.sql b/synapse/storage/schema/keys.sql
index 9bf2068d84..a9e0a4fe0d 100644
--- a/synapse/storage/schema/keys.sql
+++ b/synapse/storage/schema/keys.sql
@@ -1,4 +1,4 @@
-/* Copyright 2014 OpenMarket Ltd
+/* Copyright 2014, 2015 OpenMarket Ltd
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
diff --git a/synapse/storage/schema/media_repository.sql b/synapse/storage/schema/media_repository.sql
index b785fa0208..afdf48cbfb 100644
--- a/synapse/storage/schema/media_repository.sql
+++ b/synapse/storage/schema/media_repository.sql
@@ -1,4 +1,4 @@
-/* Copyright 2014 OpenMarket Ltd
+/* Copyright 2014, 2015 OpenMarket Ltd
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
diff --git a/synapse/storage/schema/presence.sql b/synapse/storage/schema/presence.sql
index 595b3b5a69..f9f8db9697 100644
--- a/synapse/storage/schema/presence.sql
+++ b/synapse/storage/schema/presence.sql
@@ -1,4 +1,4 @@
-/* Copyright 2014 OpenMarket Ltd
+/* Copyright 2014, 2015 OpenMarket Ltd
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
diff --git a/synapse/storage/schema/profiles.sql b/synapse/storage/schema/profiles.sql
index 58209f1af0..f06a528b4d 100644
--- a/synapse/storage/schema/profiles.sql
+++ b/synapse/storage/schema/profiles.sql
@@ -1,4 +1,4 @@
-/* Copyright 2014 OpenMarket Ltd
+/* Copyright 2014, 2015 OpenMarket Ltd
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
diff --git a/synapse/storage/schema/redactions.sql b/synapse/storage/schema/redactions.sql
index 4c2829d05d..5011d95db8 100644
--- a/synapse/storage/schema/redactions.sql
+++ b/synapse/storage/schema/redactions.sql
@@ -1,3 +1,17 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 CREATE TABLE IF NOT EXISTS redactions (
     event_id TEXT NOT NULL,
     redacts TEXT NOT NULL,
diff --git a/synapse/storage/schema/room_aliases.sql b/synapse/storage/schema/room_aliases.sql
index 9191016814..0d2df01603 100644
--- a/synapse/storage/schema/room_aliases.sql
+++ b/synapse/storage/schema/room_aliases.sql
@@ -1,4 +1,4 @@
-/* Copyright 2014 OpenMarket Ltd
+/* Copyright 2014, 2015 OpenMarket Ltd
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
diff --git a/synapse/storage/schema/state.sql b/synapse/storage/schema/state.sql
index 2c48d6daca..1fe8f1e430 100644
--- a/synapse/storage/schema/state.sql
+++ b/synapse/storage/schema/state.sql
@@ -1,4 +1,4 @@
-/* Copyright 2014 OpenMarket Ltd
+/* Copyright 2014, 2015 OpenMarket Ltd
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
diff --git a/synapse/storage/schema/transactions.sql b/synapse/storage/schema/transactions.sql
index de461bfa15..2d30f99b06 100644
--- a/synapse/storage/schema/transactions.sql
+++ b/synapse/storage/schema/transactions.sql
@@ -1,4 +1,4 @@
-/* Copyright 2014 OpenMarket Ltd
+/* Copyright 2014, 2015 OpenMarket Ltd
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -42,6 +42,7 @@ CREATE INDEX IF NOT EXISTS sent_transaction_dest ON sent_transactions(destinatio
 CREATE INDEX IF NOT EXISTS sent_transaction_dest_referenced ON sent_transactions(
     destination
 );
+CREATE INDEX IF NOT EXISTS sent_transaction_txn_id ON sent_transactions(transaction_id);
 -- So that we can do an efficient look up of all transactions that have yet to be successfully
 -- sent.
 CREATE INDEX IF NOT EXISTS sent_transaction_sent ON sent_transactions(response_code);
diff --git a/synapse/storage/schema/users.sql b/synapse/storage/schema/users.sql
index 8244f733bd..08ccfdac0a 100644
--- a/synapse/storage/schema/users.sql
+++ b/synapse/storage/schema/users.sql
@@ -1,4 +1,4 @@
-/* Copyright 2014 OpenMarket Ltd
+/* Copyright 2014, 2015 OpenMarket Ltd
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index 3a705119fd..d0d53770f2 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -1,5 +1,5 @@
 # -*- coding: utf-8 -*-
-# Copyright 2014 OpenMarket Ltd
+# Copyright 2014, 2015 OpenMarket Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index afe3e5edea..5327517704 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -1,5 +1,5 @@
 # -*- coding: utf-8 -*-
-# Copyright 2014 OpenMarket Ltd
+# Copyright 2014, 2015 OpenMarket Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -15,6 +15,10 @@
 
 from ._base import SQLBaseStore
 
+import logging
+
+logger = logging.getLogger(__name__)
+
 
 class StateStore(SQLBaseStore):
     """ Keeps track of the state at a given event.
@@ -62,14 +66,8 @@ class StateStore(SQLBaseStore):
                     keyvalues={"state_group": group},
                     retcol="event_id",
                 )
-                state = []
-                for state_id in state_ids:
-                    s = self._get_events_txn(
-                        txn,
-                        [state_id],
-                    )
-                    if s:
-                        state.extend(s)
+
+                state = self._get_events_txn(txn, state_ids)
 
                 res[group] = state
 
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 3405cb365e..744c821dfe 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -1,5 +1,5 @@
 # -*- coding: utf-8 -*-
-# Copyright 2014 OpenMarket Ltd
+# Copyright 2014, 2015 OpenMarket Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -137,7 +137,6 @@ class StreamStore(SQLBaseStore):
                 with_feedback=with_feedback,
             )
 
-    @defer.inlineCallbacks
     @log_function
     def get_room_events_stream(self, user_id, from_key, to_key, room_id,
                                limit=0, with_feedback=False):
@@ -157,11 +156,6 @@ class StreamStore(SQLBaseStore):
             "WHERE m.user_id = ? "
         )
 
-        del_sql = (
-            "SELECT event_id FROM redactions WHERE redacts = e.event_id "
-            "LIMIT 1"
-        )
-
         if limit:
             limit = max(limit, MAX_STREAM_SIZE)
         else:
@@ -172,38 +166,42 @@ class StreamStore(SQLBaseStore):
         to_id = _parse_stream_token(to_key)
 
         if from_key == to_key:
-            defer.returnValue(([], to_key))
-            return
+            return defer.succeed(([], to_key))
 
         sql = (
-            "SELECT *, (%(redacted)s) AS redacted FROM events AS e WHERE "
+            "SELECT e.event_id, e.stream_ordering FROM events AS e WHERE "
             "(e.outlier = 0 AND (room_id IN (%(current)s)) OR "
             "(event_id IN (%(invites)s))) "
             "AND e.stream_ordering > ? AND e.stream_ordering <= ? "
             "ORDER BY stream_ordering ASC LIMIT %(limit)d "
         ) % {
-            "redacted": del_sql,
             "current": current_room_membership_sql,
             "invites": membership_sql,
             "limit": limit
         }
 
-        rows = yield self._execute_and_decode(
-            sql,
-            user_id, user_id, from_id, to_id
-        )
+        def f(txn):
+            txn.execute(sql, (user_id, user_id, from_id, to_id,))
 
-        ret = yield self._parse_events(rows)
+            rows = self.cursor_to_dict(txn)
 
-        if rows:
-            key = "s%d" % max([r["stream_ordering"] for r in rows])
-        else:
-            # Assume we didn't get anything because there was nothing to get.
-            key = to_key
+            ret = self._get_events_txn(
+                txn,
+                [r["event_id"] for r in rows],
+                get_prev_content=True
+            )
+
+            if rows:
+                key = "s%d" % max([r["stream_ordering"] for r in rows])
+            else:
+                # Assume we didn't get anything because there was nothing to
+                # get.
+                key = to_key
+
+            return ret, key
 
-        defer.returnValue((ret, key))
+        return self.runInteraction("get_room_events_stream", f)
 
-    @defer.inlineCallbacks
     @log_function
     def paginate_room_events(self, room_id, from_key, to_key=None,
                              direction='b', limit=-1,
@@ -221,7 +219,9 @@ class StreamStore(SQLBaseStore):
 
         bounds = _get_token_bound(from_key, from_comp)
         if to_key:
-            bounds = "%s AND %s" % (bounds, _get_token_bound(to_key, to_comp))
+            bounds = "%s AND %s" % (
+                bounds, _get_token_bound(to_key, to_comp)
+            )
 
         if int(limit) > 0:
             args.append(int(limit))
@@ -229,87 +229,82 @@ class StreamStore(SQLBaseStore):
         else:
             limit_str = ""
 
-        del_sql = (
-            "SELECT event_id FROM redactions WHERE redacts = events.event_id "
-            "LIMIT 1"
-        )
-
         sql = (
-            "SELECT *, (%(redacted)s) AS redacted FROM events"
+            "SELECT * FROM events"
             " WHERE outlier = 0 AND room_id = ? AND %(bounds)s"
             " ORDER BY topological_ordering %(order)s,"
             " stream_ordering %(order)s %(limit)s"
         ) % {
-            "redacted": del_sql,
             "bounds": bounds,
             "order": order,
             "limit": limit_str
         }
 
-        rows = yield self._execute_and_decode(
-            sql,
-            *args
-        )
-
-        if rows:
-            topo = rows[-1]["topological_ordering"]
-            toke = rows[-1]["stream_ordering"]
-            if direction == 'b':
-                topo -= 1
-                toke -= 1
-            next_token = "t%s-%s" % (topo, toke)
-        else:
-            # TODO (erikj): We should work out what to do here instead.
-            next_token = to_key if to_key else from_key
+        def f(txn):
+            txn.execute(sql, args)
+
+            rows = self.cursor_to_dict(txn)
+
+            if rows:
+                topo = rows[-1]["topological_ordering"]
+                toke = rows[-1]["stream_ordering"]
+                if direction == 'b':
+                    topo -= 1
+                    toke -= 1
+                next_token = "t%s-%s" % (topo, toke)
+            else:
+                # TODO (erikj): We should work out what to do here instead.
+                next_token = to_key if to_key else from_key
+
+            events = self._get_events_txn(
+                txn,
+                [r["event_id"] for r in rows],
+                get_prev_content=True
+            )
 
-        events = yield self._parse_events(rows)
+            return events, next_token,
 
-        defer.returnValue(
-            (
-                events,
-                next_token
-            )
-        )
+        return self.runInteraction("paginate_room_events", f)
 
-    @defer.inlineCallbacks
     def get_recent_events_for_room(self, room_id, limit, end_token,
                                    with_feedback=False):
         # TODO (erikj): Handle compressed feedback
 
-        del_sql = (
-            "SELECT event_id FROM redactions WHERE redacts = events.event_id "
-            "LIMIT 1"
-        )
-
         sql = (
-            "SELECT *, (%(redacted)s) AS redacted FROM events "
+            "SELECT stream_ordering, topological_ordering, event_id FROM events "
             "WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0 "
             "ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ? "
-        ) % {
-            "redacted": del_sql,
-        }
-
-        rows = yield self._execute_and_decode(
-            sql,
-            room_id, end_token, limit
         )
 
-        rows.reverse()  # As we selected with reverse ordering
+        def f(txn):
+            txn.execute(sql, (room_id, end_token, limit,))
 
-        if rows:
-            topo = rows[0]["topological_ordering"]
-            toke = rows[0]["stream_ordering"]
-            start_token = "t%s-%s" % (topo, toke)
+            rows = self.cursor_to_dict(txn)
 
-            token = (start_token, end_token)
-        else:
-            token = (end_token, end_token)
+            rows.reverse()  # As we selected with reverse ordering
 
-        events = yield self._parse_events(rows)
+            if rows:
+                # XXX: Always subtract 1 since the start token always goes
+                # backwards (parity with paginate_room_events). It isn't
+                # obvious that this is correct; we should clarify the algorithm
+                # used here.
+                topo = rows[0]["topological_ordering"] - 1
+                toke = rows[0]["stream_ordering"] - 1
+                start_token = "t%s-%s" % (topo, toke)
+
+                token = (start_token, end_token)
+            else:
+                token = (end_token, end_token)
+
+            events = self._get_events_txn(
+                txn,
+                [r["event_id"] for r in rows],
+                get_prev_content=True
+            )
 
-        ret = (events, token)
+            return events, token
 
-        defer.returnValue(ret)
+        return self.runInteraction("get_recent_events_for_room", f)
 
     def get_room_events_max_id(self):
         return self.runInteraction(
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index 423cc3f02a..e06ef35690 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -1,5 +1,5 @@
 # -*- coding: utf-8 -*-
-# Copyright 2014 OpenMarket Ltd
+# Copyright 2014, 2015 OpenMarket Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.