summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--scripts/port_from_sqlite_to_postgres.py611
-rwxr-xr-xsynapse/app/homeserver.py64
-rw-r--r--synapse/config/database.py9
-rw-r--r--synapse/handlers/_base.py15
-rw-r--r--synapse/handlers/federation.py2
-rw-r--r--synapse/handlers/login.py2
-rw-r--r--synapse/handlers/message.py28
-rw-r--r--synapse/handlers/room.py19
-rw-r--r--synapse/notifier.py2
-rw-r--r--synapse/push/pusherpool.py8
-rw-r--r--synapse/rest/client/v1/profile.py7
-rw-r--r--synapse/storage/__init__.py134
-rw-r--r--synapse/storage/_base.py135
-rw-r--r--synapse/storage/appservice.py4
-rw-r--r--synapse/storage/directory.py8
-rw-r--r--synapse/storage/engines/__init__.py37
-rw-r--r--synapse/storage/engines/postgres.py41
-rw-r--r--synapse/storage/engines/sqlite3.py37
-rw-r--r--synapse/storage/event_federation.py35
-rw-r--r--synapse/storage/events.py88
-rw-r--r--synapse/storage/keys.py16
-rw-r--r--synapse/storage/presence.py1
-rw-r--r--synapse/storage/profile.py7
-rw-r--r--synapse/storage/push_rule.py4
-rw-r--r--synapse/storage/pusher.py33
-rw-r--r--synapse/storage/registration.py45
-rw-r--r--synapse/storage/room.py39
-rw-r--r--synapse/storage/roommember.py148
-rw-r--r--synapse/storage/schema/delta/12/v12.sql52
-rw-r--r--synapse/storage/schema/delta/13/v13.sql17
-rw-r--r--synapse/storage/schema/delta/14/v14.sql4
-rw-r--r--synapse/storage/schema/delta/15/appservice_txns.sql15
-rw-r--r--synapse/storage/schema/delta/15/presence_indices.sql2
-rw-r--r--synapse/storage/schema/delta/16/events_order_index.sql4
-rw-r--r--synapse/storage/schema/delta/16/remote_media_cache_index.sql2
-rw-r--r--synapse/storage/schema/delta/16/remove_duplicates.sql9
-rw-r--r--synapse/storage/schema/delta/16/room_alias_index.sql3
-rw-r--r--synapse/storage/schema/delta/16/unique_constraints.sql80
-rw-r--r--synapse/storage/schema/delta/16/users.sql56
-rw-r--r--synapse/storage/schema/full_schemas/11/event_edges.sql76
-rw-r--r--synapse/storage/schema/full_schemas/11/event_signatures.sql54
-rw-r--r--synapse/storage/schema/full_schemas/11/im.sql124
-rw-r--r--synapse/storage/schema/full_schemas/11/keys.sql24
-rw-r--r--synapse/storage/schema/full_schemas/11/media_repository.sql55
-rw-r--r--synapse/storage/schema/full_schemas/11/presence.sql21
-rw-r--r--synapse/storage/schema/full_schemas/11/profiles.sql7
-rw-r--r--synapse/storage/schema/full_schemas/11/redactions.sql10
-rw-r--r--synapse/storage/schema/full_schemas/11/room_aliases.sql11
-rw-r--r--synapse/storage/schema/full_schemas/11/state.sql31
-rw-r--r--synapse/storage/schema/full_schemas/11/transactions.sql45
-rw-r--r--synapse/storage/schema/full_schemas/11/users.sql36
-rw-r--r--synapse/storage/schema/full_schemas/16/application_services.sql48
-rw-r--r--synapse/storage/schema/full_schemas/16/event_edges.sql89
-rw-r--r--synapse/storage/schema/full_schemas/16/event_signatures.sql55
-rw-r--r--synapse/storage/schema/full_schemas/16/im.sql128
-rw-r--r--synapse/storage/schema/full_schemas/16/keys.sql31
-rw-r--r--synapse/storage/schema/full_schemas/16/media_repository.sql68
-rw-r--r--synapse/storage/schema/full_schemas/16/presence.sql40
-rw-r--r--synapse/storage/schema/full_schemas/16/profiles.sql20
-rw-r--r--synapse/storage/schema/full_schemas/16/push.sql73
-rw-r--r--synapse/storage/schema/full_schemas/16/redactions.sql22
-rw-r--r--synapse/storage/schema/full_schemas/16/room_aliases.sql29
-rw-r--r--synapse/storage/schema/full_schemas/16/state.sql40
-rw-r--r--synapse/storage/schema/full_schemas/16/transactions.sql63
-rw-r--r--synapse/storage/schema/full_schemas/16/users.sql42
-rw-r--r--synapse/storage/schema/schema_version.sql11
-rw-r--r--synapse/storage/signatures.py6
-rw-r--r--synapse/storage/state.py13
-rw-r--r--synapse/storage/stream.py51
-rw-r--r--synapse/storage/transactions.py208
-rw-r--r--synapse/storage/util/__init__.py14
-rw-r--r--synapse/storage/util/id_generators.py130
-rw-r--r--synapse/util/lrucache.py20
-rw-r--r--synapse/util/retryutils.py2
-rw-r--r--tests/federation/test_federation.py10
-rw-r--r--tests/handlers/test_federation.py9
-rw-r--r--tests/handlers/test_presence.py7
-rw-r--r--tests/handlers/test_typing.py7
-rw-r--r--tests/rest/client/v1/test_events.py9
-rw-r--r--tests/storage/test_appservice.py31
-rw-r--r--tests/storage/test_base.py26
-rw-r--r--tests/storage/test_registration.py36
-rw-r--r--tests/storage/test_roommember.py14
-rw-r--r--tests/utils.py11
84 files changed, 2805 insertions, 875 deletions
diff --git a/scripts/port_from_sqlite_to_postgres.py b/scripts/port_from_sqlite_to_postgres.py
new file mode 100644
index 0000000000..fc1603c1c9
--- /dev/null
+++ b/scripts/port_from_sqlite_to_postgres.py
@@ -0,0 +1,611 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer, reactor
+from twisted.enterprise import adbapi
+
+from synapse.storage._base import LoggingTransaction, SQLBaseStore
+from synapse.storage.engines import create_engine
+
+import argparse
+import curses
+import logging
+import sys
+import time
+import traceback
+import yaml
+
+
+logger = logging.getLogger("port_from_sqlite_to_postgres")
+
+
+BOOLEAN_COLUMNS = {
+    "events": ["processed", "outlier"],
+    "rooms": ["is_public"],
+    "event_edges": ["is_state"],
+    "presence_list": ["accepted"],
+}
+
+
+APPEND_ONLY_TABLES = [
+    "event_content_hashes",
+    "event_reference_hashes",
+    "event_signatures",
+    "event_edge_hashes",
+    "events",
+    "event_json",
+    "state_events",
+    "room_memberships",
+    "feedback",
+    "topics",
+    "room_names",
+    "rooms",
+    "local_media_repository",
+    "local_media_repository_thumbnails",
+    "remote_media_cache",
+    "remote_media_cache_thumbnails",
+    "redactions",
+    "event_edges",
+    "event_auth",
+    "received_transactions",
+    "sent_transactions",
+    "transaction_id_to_pdu",
+    "users",
+    "state_groups",
+    "state_groups_state",
+    "event_to_state_groups",
+    "rejections",
+]
+
+
+end_error_exec_info = None
+
+
+class Store(object):
+    """This object is used to pull out some of the convenience API from the
+    Storage layer.
+
+    *All* database interactions should go through this object.
+    """
+    def __init__(self, db_pool, engine):
+        self.db_pool = db_pool
+        self.database_engine = engine
+
+    _simple_insert_txn = SQLBaseStore.__dict__["_simple_insert_txn"]
+    _simple_insert = SQLBaseStore.__dict__["_simple_insert"]
+
+    _simple_select_onecol_txn = SQLBaseStore.__dict__["_simple_select_onecol_txn"]
+    _simple_select_onecol = SQLBaseStore.__dict__["_simple_select_onecol"]
+    _simple_select_one_onecol = SQLBaseStore.__dict__["_simple_select_one_onecol"]
+    _simple_select_one_onecol_txn = SQLBaseStore.__dict__["_simple_select_one_onecol_txn"]
+
+    _simple_update_one = SQLBaseStore.__dict__["_simple_update_one"]
+    _simple_update_one_txn = SQLBaseStore.__dict__["_simple_update_one_txn"]
+
+    _execute_and_decode = SQLBaseStore.__dict__["_execute_and_decode"]
+
+    def runInteraction(self, desc, func, *args, **kwargs):
+        def r(conn):
+            try:
+                i = 0
+                N = 5
+                while True:
+                    try:
+                        txn = conn.cursor()
+                        return func(
+                            LoggingTransaction(txn, desc, self.database_engine),
+                            *args, **kwargs
+                        )
+                    except self.database_engine.module.DatabaseError as e:
+                        if self.database_engine.is_deadlock(e):
+                            logger.warn("[TXN DEADLOCK] {%s} %d/%d", desc, i, N)
+                            if i < N:
+                                i += 1
+                                conn.rollback()
+                                continue
+                        raise
+            except Exception as e:
+                logger.debug("[TXN FAIL] {%s} %s", desc, e)
+                raise
+
+        return self.db_pool.runWithConnection(r)
+
+    def execute(self, f):
+        return self.runInteraction(f.__name__, f)
+
+    def insert_many_txn(self, txn, table, headers, rows):
+        sql = "INSERT INTO %s (%s) VALUES (%s)" % (
+            table,
+            ", ".join(k for k in headers),
+            ", ".join("%s" for _ in headers)
+        )
+
+        try:
+            txn.executemany(sql, rows)
+        except:
+            logger.exception(
+                "Failed to insert: %s",
+                table,
+            )
+            raise
+
+
+class Progress(object):
+    """Used to report progress of the port
+    """
+    def __init__(self):
+        self.tables = {}
+
+        self.start_time = int(time.time())
+
+    def add_table(self, table, cur, size):
+        self.tables[table] = {
+            "start": cur,
+            "num_done": cur,
+            "total": size,
+            "perc": int(cur * 100 / size),
+        }
+
+    def update(self, table, num_done):
+        data = self.tables[table]
+        data["num_done"] = num_done
+        data["perc"] = int(num_done * 100 / data["total"])
+
+    def done(self):
+        pass
+
+
+class CursesProgress(Progress):
+    """Reports progress to a curses window
+    """
+    def __init__(self, stdscr):
+        self.stdscr = stdscr
+
+        curses.use_default_colors()
+        curses.curs_set(0)
+
+        curses.init_pair(1, curses.COLOR_RED, -1)
+        curses.init_pair(2, curses.COLOR_GREEN, -1)
+
+        self.last_update = 0
+
+        self.finished = False
+
+        super(CursesProgress, self).__init__()
+
+    def update(self, table, num_done):
+        super(CursesProgress, self).update(table, num_done)
+
+        self.render()
+
+    def render(self, force=False):
+        now = time.time()
+
+        if not force and now - self.last_update < 0.2:
+            # reactor.callLater(1, self.render)
+            return
+
+        self.stdscr.clear()
+
+        rows, cols = self.stdscr.getmaxyx()
+
+        duration = int(now) - int(self.start_time)
+
+        minutes, seconds = divmod(duration, 60)
+        duration_str = '%02dm %02ds' % (minutes, seconds,)
+
+        if self.finished:
+            status = "Time spent: %s (Done!)" % (duration_str,)
+        else:
+            min_perc = min(
+                (v["num_done"] - v["start"]) * 100. / (v["total"] - v["start"])
+                if v["total"] - v["start"] else 100
+                for v in self.tables.values()
+            )
+            if min_perc > 0:
+                est_remaining = (int(now) - self.start_time) * 100 / min_perc
+                est_remaining_str = '%02dm %02ds remaining' % divmod(est_remaining, 60)
+            else:
+                est_remaining_str = "Unknown"
+            status = (
+                "Time spent: %s (est. remaining: %s)"
+                % (duration_str, est_remaining_str,)
+            )
+
+        self.stdscr.addstr(
+            0, 0,
+            status,
+            curses.A_BOLD,
+        )
+
+        max_len = max([len(t) for t in self.tables.keys()])
+
+        left_margin = 5
+        middle_space = 1
+
+        items = self.tables.items()
+        items.sort(
+            key=lambda i: (i[1]["perc"], i[0]),
+        )
+
+        for i, (table, data) in enumerate(items):
+            if i + 2 >= rows:
+                break
+
+            perc = data["perc"]
+
+            color = curses.color_pair(2) if perc == 100 else curses.color_pair(1)
+
+            self.stdscr.addstr(
+                i+2, left_margin + max_len - len(table),
+                table,
+                curses.A_BOLD | color,
+            )
+
+            size = 20
+
+            progress = "[%s%s]" % (
+                "#" * int(perc*size/100),
+                " " * (size - int(perc*size/100)),
+            )
+
+            self.stdscr.addstr(
+                i+2, left_margin + max_len + middle_space,
+                "%s %3d%% (%d/%d)" % (progress, perc, data["num_done"], data["total"]),
+            )
+
+        if self.finished:
+            self.stdscr.addstr(
+                self.rows-1, 0 ,
+                "Press any key to exit...",
+            )
+
+        self.stdscr.refresh()
+        self.last_update = time.time()
+
+    def done(self):
+        self.finished = True
+        self.render(True)
+        self.stdscr.getch()
+
+    def on_prepare_sqlite(self):
+        self.stdscr.clear()
+        self.stdscr.addstr(
+            0, 0,
+            "Preparing SQLite database...",
+            curses.A_BOLD,
+        )
+        self.stdscr.refresh()
+
+    def on_prepare_postgres(self):
+        self.stdscr.clear()
+        self.stdscr.addstr(
+            0, 0,
+            "Preparing PostgreSQL database...",
+            curses.A_BOLD,
+        )
+        self.stdscr.refresh()
+
+    def fetching_tables(self):
+        self.stdscr.clear()
+        self.stdscr.addstr(
+            0, 0,
+            "Fetching tables...",
+            curses.A_BOLD,
+        )
+        self.stdscr.refresh()
+
+    def preparing_tables(self):
+        self.stdscr.clear()
+        self.stdscr.addstr(
+            0, 0,
+            "Preparing tables...",
+            curses.A_BOLD,
+        )
+        self.stdscr.refresh()
+
+
+class TerminalProgress(Progress):
+    """Just prints progress to the terminal
+    """
+    def update(self, table, num_done):
+        super(TerminalProgress, self).update(table, num_done)
+
+        data = self.tables[table]
+
+        print "%s: %d%% (%d/%d)" % (
+            table, data["perc"],
+            data["num_done"], data["total"],
+        )
+
+    def on_prepare_sqlite(self):
+        print "Preparing SQLite database..."
+
+    def on_prepare_postgres(self):
+        print "Preparing PostgreSQL database..."
+
+    def fetching_tables(self):
+        print "Fetching tables..."
+
+    def preparing_tables(self):
+        print "Preparing tables..."
+
+
+class Porter(object):
+    def __init__(self, **kwargs):
+        self.__dict__.update(kwargs)
+
+    @defer.inlineCallbacks
+    def handle_table(self, table):
+        if table in APPEND_ONLY_TABLES:
+            # It's safe to just carry on inserting.
+            next_chunk = yield self.postgres_store._simple_select_one_onecol(
+                table="port_from_sqlite3",
+                keyvalues={"table_name": table},
+                retcol="rowid",
+                allow_none=True,
+            )
+
+            if next_chunk is None:
+                yield self.postgres_store._simple_insert(
+                    table="port_from_sqlite3",
+                    values={"table_name": table, "rowid": 1}
+                )
+
+                next_chunk = 1
+        else:
+            def delete_all(txn):
+                txn.execute(
+                    "DELETE FROM port_from_sqlite3 WHERE table_name = %s",
+                    (table,)
+                )
+                txn.execute("TRUNCATE %s CASCADE" % (table,))
+                self.postgres_store._simple_insert_txn(
+                    txn,
+                    table="port_from_sqlite3",
+                    values={"table_name": table, "rowid": 0}
+                )
+            yield self.postgres_store.execute(delete_all)
+
+            next_chunk = 1
+
+        def get_table_size(txn):
+            txn.execute("SELECT count(*) FROM %s" % (table,))
+            size, = txn.fetchone()
+            return int(size)
+
+        table_size = yield self.sqlite_store.execute(get_table_size)
+        postgres_size = yield self.postgres_store.execute(get_table_size)
+
+        if not table_size:
+            return
+
+        self.progress.add_table(table, postgres_size, table_size)
+
+        select = (
+            "SELECT rowid, * FROM %s WHERE rowid >= ? ORDER BY rowid LIMIT ?"
+            % (table,)
+        )
+
+        bool_col_names = BOOLEAN_COLUMNS.get(table, [])
+
+        while True:
+            def r(txn):
+                txn.execute(select, (next_chunk, self.batch_size,))
+                rows = txn.fetchall()
+                headers = [column[0] for column in txn.description]
+
+                return headers, rows
+
+            headers, rows = yield self.sqlite_store.runInteraction("select", r)
+
+            if rows:
+                bool_cols = [
+                    i for i, h in enumerate(headers) if h in bool_col_names
+                ]
+                next_chunk = rows[-1][0] + 1
+
+                def conv(j, col):
+                    if j in bool_cols:
+                        return bool(col)
+                    return col
+
+                for i, row in enumerate(rows):
+                    rows[i] = tuple(
+                        self.postgres_store.database_engine.encode_parameter(
+                            conv(j, col)
+                        )
+                        for j, col in enumerate(row)
+                        if j > 0
+                    )
+
+                def insert(txn):
+                    self.postgres_store.insert_many_txn(
+                        txn, table, headers[1:], rows
+                    )
+
+                    self.postgres_store._simple_update_one_txn(
+                        txn,
+                        table="port_from_sqlite3",
+                        keyvalues={"table_name": table},
+                        updatevalues={"rowid": next_chunk},
+                    )
+
+                yield self.postgres_store.execute(insert)
+
+                postgres_size += len(rows)
+
+                self.progress.update(table, postgres_size)
+            else:
+                return
+
+    def setup_db(self, db_config, database_engine):
+        db_conn = database_engine.module.connect(
+            **{
+                k: v for k, v in db_config.get("args", {}).items()
+                if not k.startswith("cp_")
+            }
+        )
+
+        database_engine.prepare_database(db_conn)
+
+        db_conn.commit()
+
+    @defer.inlineCallbacks
+    def run(self):
+        try:
+            sqlite_db_pool = adbapi.ConnectionPool(
+                self.sqlite_config["name"],
+                **self.sqlite_config["args"]
+            )
+
+            postgres_db_pool = adbapi.ConnectionPool(
+                self.postgres_config["name"],
+                **self.postgres_config["args"]
+            )
+
+            sqlite_engine = create_engine("sqlite3")
+            postgres_engine = create_engine("psycopg2")
+
+            self.sqlite_store = Store(sqlite_db_pool, sqlite_engine)
+            self.postgres_store = Store(postgres_db_pool, postgres_engine)
+
+            # Step 1. Set up databases.
+            self.progress.on_prepare_sqlite()
+            self.setup_db(sqlite_config, sqlite_engine)
+
+            self.progress.on_prepare_postgres()
+            self.setup_db(postgres_config, postgres_engine)
+
+            # Step 2. Get tables.
+            self.progress.fetching_tables()
+            sqlite_tables = yield self.sqlite_store._simple_select_onecol(
+                table="sqlite_master",
+                keyvalues={
+                    "type": "table",
+                },
+                retcol="name",
+            )
+
+            postgres_tables = yield self.postgres_store._simple_select_onecol(
+                table="information_schema.tables",
+                keyvalues={
+                    "table_schema": "public",
+                },
+                retcol="distinct table_name",
+            )
+
+            tables = set(sqlite_tables) & set(postgres_tables)
+
+            self.progress.preparing_tables()
+
+            logger.info("Found %d tables", len(tables))
+
+            def create_port_table(txn):
+                txn.execute(
+                    "CREATE TABLE port_from_sqlite3 ("
+                    " table_name varchar(100) NOT NULL UNIQUE,"
+                    " rowid bigint NOT NULL"
+                    ")"
+                )
+
+            try:
+                yield self.postgres_store.runInteraction(
+                    "create_port_table", create_port_table
+                )
+            except Exception as e:
+                logger.info("Failed to create port table: %s", e)
+
+            # Process tables.
+            yield defer.gatherResults(
+                [
+                    self.handle_table(table)
+                    for table in tables
+                    if table not in ["schema_version", "applied_schema_deltas"]
+                    and not table.startswith("sqlite_")
+                ],
+                consumeErrors=True,
+            )
+
+            self.progress.done()
+        except:
+            global end_error_exec_info
+            end_error_exec_info = sys.exc_info()
+            logger.exception("")
+        finally:
+            reactor.stop()
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser()
+    parser.add_argument("-v", action='store_true')
+    parser.add_argument("--curses", action='store_true')
+    parser.add_argument("--sqlite-database")
+    parser.add_argument(
+        "--postgres-config", type=argparse.FileType('r'),
+    )
+
+    parser.add_argument("--batch-size", type=int, default=1000)
+
+    args = parser.parse_args()
+
+
+    logging_config = {
+        "level": logging.DEBUG if args.v else logging.INFO,
+        "format": "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(message)s"
+    }
+
+    if args.curses:
+        logging_config["filename"] = "port-synapse.log"
+
+    logging.basicConfig(**logging_config)
+
+    sqlite_config = {
+        "name": "sqlite3",
+        "args": {
+            "database": args.sqlite_database,
+            "cp_min": 1,
+            "cp_max": 1,
+            "check_same_thread": False,
+        },
+    }
+
+    postgres_config = yaml.safe_load(args.postgres_config)
+
+    def start(stdscr=None):
+        if stdscr:
+            progress = CursesProgress(stdscr)
+        else:
+            progress = TerminalProgress()
+
+        porter = Porter(
+            sqlite_config=sqlite_config,
+            postgres_config=postgres_config,
+            progress=progress,
+            batch_size=args.batch_size,
+        )
+
+        reactor.callWhenRunning(porter.run)
+
+        reactor.run()
+
+    if args.curses:
+        curses.wrapper(start)
+    else:
+        start()
+
+    if end_error_exec_info:
+        exc_type, exc_value, exc_traceback = end_error_exec_info
+        traceback.print_exception(exc_type, exc_value, exc_traceback)
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 27e53a9e56..93500dd791 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -17,9 +17,8 @@
 import sys
 sys.dont_write_bytecode = True
 
-from synapse.storage import (
-    prepare_database, prepare_sqlite3_database, UpgradeDatabaseException,
-)
+from synapse.storage import UpgradeDatabaseException
+from synapse.storage.engines import create_engine
 
 from synapse.server import HomeServer
 
@@ -57,9 +56,10 @@ import os
 import re
 import resource
 import subprocess
-import sqlite3
+import yaml
+
 
-logger = logging.getLogger(__name__)
+logger = logging.getLogger("synapse.app.homeserver")
 
 
 class SynapseHomeServer(HomeServer):
@@ -103,13 +103,11 @@ class SynapseHomeServer(HomeServer):
             return None
 
     def build_db_pool(self):
+        name = self.db_config["name"]
+
         return adbapi.ConnectionPool(
-            "sqlite3", self.get_db_name(),
-            check_same_thread=False,
-            cp_min=1,
-            cp_max=1,
-            cp_openfun=prepare_database,  # Prepare the database for each conn
-                                          # so that :memory: sqlite works
+            name,
+            **self.db_config.get("args", {})
         )
 
     def create_resource_tree(self, redirect_root_to_web_client):
@@ -352,15 +350,46 @@ def setup(config_options):
 
     tls_context_factory = context_factory.ServerContextFactory(config)
 
+    if config.database_config:
+        with open(config.database_config, 'r') as f:
+            db_config = yaml.safe_load(f)
+    else:
+        db_config = {
+            "name": "sqlite3",
+            "args": {
+                "database": config.database_path,
+            },
+        }
+
+    db_config = {
+        k: v for k, v in db_config.items()
+    }
+
+    name = db_config.get("name", None)
+    if name == "psycopg2":
+        pass
+    elif name == "sqlite3":
+        db_config.setdefault("args", {}).update({
+            "cp_min": 1,
+            "cp_max": 1,
+        })
+    else:
+        raise RuntimeError("Unsupported database type '%s'" % (name,))
+
+    database_engine = create_engine(name)
+    db_config["args"]["cp_openfun"] = database_engine.on_new_connection
+
     hs = SynapseHomeServer(
         config.server_name,
         domain_with_port=domain_with_port,
         upload_dir=os.path.abspath("uploads"),
         db_name=config.database_path,
+        db_config=db_config,
         tls_context_factory=tls_context_factory,
         config=config,
         content_addr=config.content_addr,
         version_string=version_string,
+        database_engine=database_engine,
     )
 
     hs.create_resource_tree(
@@ -372,9 +401,16 @@ def setup(config_options):
     logger.info("Preparing database: %s...", db_name)
 
     try:
-        with sqlite3.connect(db_name) as db_conn:
-            prepare_sqlite3_database(db_conn)
-            prepare_database(db_conn)
+        db_conn = database_engine.module.connect(
+            **{
+                k: v for k, v in db_config.get("args", {}).items()
+                if not k.startswith("cp_")
+            }
+        )
+
+        database_engine.prepare_database(db_conn)
+
+        db_conn.commit()
     except UpgradeDatabaseException:
         sys.stderr.write(
             "\nFailed to upgrade database.\n"
diff --git a/synapse/config/database.py b/synapse/config/database.py
index 87efe54645..8dc9873f8c 100644
--- a/synapse/config/database.py
+++ b/synapse/config/database.py
@@ -26,6 +26,11 @@ class DatabaseConfig(Config):
             self.database_path = self.abspath(args.database_path)
         self.event_cache_size = self.parse_size(args.event_cache_size)
 
+        if args.database_config:
+            self.database_config = self.abspath(args.database_config)
+        else:
+            self.database_config = None
+
     @classmethod
     def add_arguments(cls, parser):
         super(DatabaseConfig, cls).add_arguments(parser)
@@ -38,6 +43,10 @@ class DatabaseConfig(Config):
             "--event-cache-size", default="100K",
             help="Number of events to cache in memory."
         )
+        db_group.add_argument(
+            "--database-config", default=None,
+            help="Location of the database configuration file."
+        )
 
     @classmethod
     def generate_config(cls, args, config_dir_path):
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 48816a242d..dffb033fbd 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -16,7 +16,6 @@
 from twisted.internet import defer
 
 from synapse.api.errors import LimitExceededError, SynapseError
-from synapse.util.async import run_on_reactor
 from synapse.crypto.event_signing import add_hashes_and_signatures
 from synapse.api.constants import Membership, EventTypes
 from synapse.types import UserID
@@ -58,8 +57,6 @@ class BaseHandler(object):
 
     @defer.inlineCallbacks
     def _create_new_client_event(self, builder):
-        yield run_on_reactor()
-
         latest_ret = yield self.store.get_latest_events_in_room(
             builder.room_id,
         )
@@ -101,8 +98,6 @@ class BaseHandler(object):
     @defer.inlineCallbacks
     def handle_new_client_event(self, event, context, extra_destinations=[],
                                 extra_users=[], suppress_auth=False):
-        yield run_on_reactor()
-
         # We now need to go and hit out to wherever we need to hit out to.
 
         if not suppress_auth:
@@ -143,7 +138,9 @@ class BaseHandler(object):
                 )
 
         # Don't block waiting on waking up all the listeners.
-        d = self.notifier.on_new_room_event(event, extra_users=extra_users)
+        notify_d = self.notifier.on_new_room_event(
+            event, extra_users=extra_users
+        )
 
         def log_failure(f):
             logger.warn(
@@ -151,8 +148,10 @@ class BaseHandler(object):
                 event.event_id, f.value
             )
 
-        d.addErrback(log_failure)
+        notify_d.addErrback(log_failure)
 
-        yield federation_handler.handle_new_event(
+        fed_d = federation_handler.handle_new_event(
             event, destinations=destinations,
         )
+
+        fed_d.addErrback(log_failure)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 8aceac28cf..98148c13d7 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -179,7 +179,7 @@ class FederationHandler(BaseHandler):
         # it's probably a good idea to mark it as not in retry-state
         # for sending (although this is a bit of a leap)
         retry_timings = yield self.store.get_destination_retry_timings(origin)
-        if (retry_timings and retry_timings.retry_last_ts):
+        if retry_timings and retry_timings["retry_last_ts"]:
             self.store.set_destination_retry_timings(origin, 0, 0)
 
         room = yield self.store.get_room(event.room_id)
diff --git a/synapse/handlers/login.py b/synapse/handlers/login.py
index 7447800460..76647c7941 100644
--- a/synapse/handlers/login.py
+++ b/synapse/handlers/login.py
@@ -57,7 +57,7 @@ class LoginHandler(BaseHandler):
             logger.warn("Attempted to login as %s but they do not exist", user)
             raise LoginError(403, "", errcode=Codes.FORBIDDEN)
 
-        stored_hash = user_info[0]["password_hash"]
+        stored_hash = user_info["password_hash"]
         if bcrypt.checkpw(password, stored_hash):
             # generate an access token and store it.
             token = self.reg_handler._generate_token(user)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 7b9685be7f..9667bb8674 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -274,7 +274,8 @@ class MessageHandler(BaseHandler):
         if limit is None:
             limit = 10
 
-        for event in room_list:
+        @defer.inlineCallbacks
+        def handle_room(event):
             d = {
                 "room_id": event.room_id,
                 "membership": event.membership,
@@ -290,12 +291,19 @@ class MessageHandler(BaseHandler):
             rooms_ret.append(d)
 
             if event.membership != Membership.JOIN:
-                continue
+                return
             try:
-                messages, token = yield self.store.get_recent_events_for_room(
-                    event.room_id,
-                    limit=limit,
-                    end_token=now_token.room_key,
+                (messages, token), current_state = yield defer.gatherResults(
+                    [
+                        self.store.get_recent_events_for_room(
+                            event.room_id,
+                            limit=limit,
+                            end_token=now_token.room_key,
+                        ),
+                        self.state_handler.get_current_state(
+                            event.room_id
+                        ),
+                    ]
                 )
 
                 start_token = now_token.copy_and_replace("room_key", token[0])
@@ -311,9 +319,6 @@ class MessageHandler(BaseHandler):
                     "end": end_token.to_string(),
                 }
 
-                current_state = yield self.state_handler.get_current_state(
-                    event.room_id
-                )
                 d["state"] = [
                     serialize_event(c, time_now, as_client_event)
                     for c in current_state.values()
@@ -321,6 +326,11 @@ class MessageHandler(BaseHandler):
             except:
                 logger.exception("Failed to get snapshot")
 
+        yield defer.gatherResults(
+            [handle_room(e) for e in room_list],
+            consumeErrors=True
+        )
+
         ret = {
             "rooms": rooms_ret,
             "presence": presence,
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 823affc380..bc7f1c2402 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -311,25 +311,6 @@ class RoomMemberHandler(BaseHandler):
         defer.returnValue(chunk_data)
 
     @defer.inlineCallbacks
-    def get_room_member(self, room_id, member_user_id, auth_user_id):
-        """Retrieve a room member from a room.
-
-        Args:
-            room_id : The room the member is in.
-            member_user_id : The member's user ID
-            auth_user_id : The user ID of the user making this request.
-        Returns:
-            The room member, or None if this member does not exist.
-        Raises:
-            SynapseError if something goes wrong.
-        """
-        yield self.auth.check_joined_room(room_id, auth_user_id)
-
-        member = yield self.store.get_room_member(user_id=member_user_id,
-                                                  room_id=room_id)
-        defer.returnValue(member)
-
-    @defer.inlineCallbacks
     def change_membership(self, event, context, do_auth=True):
         """ Change the membership status of a user in a room.
 
diff --git a/synapse/notifier.py b/synapse/notifier.py
index d750a6fcf7..ea854482b5 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -98,7 +98,7 @@ class _NotificationListener(object):
             try:
                 notifier.clock.cancel_call_later(self.timer)
             except:
-                logger.exception("Failed to cancel notifier timer")
+                logger.warn("Failed to cancel notifier timer")
 
 
 class Notifier(object):
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 90babd7224..7f3dd50b47 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -19,10 +19,7 @@ from twisted.internet import defer
 from httppusher import HttpPusher
 from synapse.push import PusherConfigException
 
-from syutil.jsonutil import encode_canonical_json
-
 import logging
-import simplejson as json
 
 logger = logging.getLogger(__name__)
 
@@ -52,8 +49,6 @@ class PusherPool:
     @defer.inlineCallbacks
     def start(self):
         pushers = yield self.store.get_all_pushers()
-        for p in pushers:
-            p['data'] = json.loads(p['data'])
         self._start_pushers(pushers)
 
     @defer.inlineCallbacks
@@ -98,7 +93,7 @@ class PusherPool:
             pushkey=pushkey,
             pushkey_ts=self.hs.get_clock().time_msec(),
             lang=lang,
-            data=encode_canonical_json(data).decode("UTF-8"),
+            data=data,
         )
         self._refresh_pusher((app_id, pushkey))
 
@@ -129,7 +124,6 @@ class PusherPool:
         p = yield self.store.get_pushers_by_app_id_and_pushkey(
             app_id_pushkey
         )
-        p['data'] = json.loads(p['data'])
 
         self._start_pushers([p])
 
diff --git a/synapse/rest/client/v1/profile.py b/synapse/rest/client/v1/profile.py
index 1e77eb49cf..7387b4adb9 100644
--- a/synapse/rest/client/v1/profile.py
+++ b/synapse/rest/client/v1/profile.py
@@ -19,9 +19,13 @@ from twisted.internet import defer
 from .base import ClientV1RestServlet, client_path_pattern
 from synapse.types import UserID
 
+import logging
 import simplejson as json
 
 
+logger = logging.getLogger(__name__)
+
+
 class ProfileDisplaynameRestServlet(ClientV1RestServlet):
     PATTERN = client_path_pattern("/profile/(?P<user_id>[^/]*)/displayname")
 
@@ -47,7 +51,8 @@ class ProfileDisplaynameRestServlet(ClientV1RestServlet):
             defer.returnValue((400, "Unable to parse name"))
 
         yield self.handlers.profile_handler.set_displayname(
-            user, auth_user, new_name)
+            user, auth_user, new_name
+        )
 
         defer.returnValue((200, {}))
 
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index f4dec70393..995114e405 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -51,7 +51,7 @@ logger = logging.getLogger(__name__)
 
 # Remember to update this number every time a change is made to database
 # schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 15
+SCHEMA_VERSION = 16
 
 dir_path = os.path.abspath(os.path.dirname(__file__))
 
@@ -104,14 +104,16 @@ class DataStore(RoomMemberStore, RoomStore,
 
         self.client_ip_last_seen.prefill(*key + (now,))
 
-        yield self._simple_insert(
+        yield self._simple_upsert(
             "user_ips",
-            {
-                "user": user.to_string(),
+            keyvalues={
+                "user_id": user.to_string(),
                 "access_token": access_token,
-                "device_id": device_id,
                 "ip": ip,
                 "user_agent": user_agent,
+            },
+            values={
+                "device_id": device_id,
                 "last_seen": now,
             },
             desc="insert_client_ip",
@@ -120,7 +122,7 @@ class DataStore(RoomMemberStore, RoomStore,
     def get_user_ip_and_agents(self, user):
         return self._simple_select_list(
             table="user_ips",
-            keyvalues={"user": user.to_string()},
+            keyvalues={"user_id": user.to_string()},
             retcols=[
                 "device_id", "access_token", "ip", "user_agent", "last_seen"
             ],
@@ -148,21 +150,23 @@ class UpgradeDatabaseException(PrepareDatabaseException):
     pass
 
 
-def prepare_database(db_conn):
+def prepare_database(db_conn, database_engine):
     """Prepares a database for usage. Will either create all necessary tables
     or upgrade from an older schema version.
     """
     try:
         cur = db_conn.cursor()
-        version_info = _get_or_create_schema_state(cur)
+        version_info = _get_or_create_schema_state(cur, database_engine)
 
         if version_info:
             user_version, delta_files, upgraded = version_info
-            _upgrade_existing_database(cur, user_version, delta_files, upgraded)
+            _upgrade_existing_database(
+                cur, user_version, delta_files, upgraded, database_engine
+            )
         else:
-            _setup_new_database(cur)
+            _setup_new_database(cur, database_engine)
 
-        cur.execute("PRAGMA user_version = %d" % (SCHEMA_VERSION,))
+        # cur.execute("PRAGMA user_version = %d" % (SCHEMA_VERSION,))
 
         cur.close()
         db_conn.commit()
@@ -171,7 +175,7 @@ def prepare_database(db_conn):
         raise
 
 
-def _setup_new_database(cur):
+def _setup_new_database(cur, database_engine):
     """Sets up the database by finding a base set of "full schemas" and then
     applying any necessary deltas.
 
@@ -225,31 +229,30 @@ def _setup_new_database(cur):
 
     directory_entries = os.listdir(sql_dir)
 
-    sql_script = "BEGIN TRANSACTION;\n"
     for filename in fnmatch.filter(directory_entries, "*.sql"):
         sql_loc = os.path.join(sql_dir, filename)
         logger.debug("Applying schema %s", sql_loc)
-        sql_script += read_schema(sql_loc)
-        sql_script += "\n"
-    sql_script += "COMMIT TRANSACTION;"
-    cur.executescript(sql_script)
+        executescript(cur, sql_loc)
 
     cur.execute(
-        "INSERT OR REPLACE INTO schema_version (version, upgraded)"
-        " VALUES (?,?)",
-        (max_current_ver, False)
+        database_engine.convert_param_style(
+            "INSERT INTO schema_version (version, upgraded)"
+            " VALUES (?,?)"
+        ),
+        (max_current_ver, False,)
     )
 
     _upgrade_existing_database(
         cur,
         current_version=max_current_ver,
         applied_delta_files=[],
-        upgraded=False
+        upgraded=False,
+        database_engine=database_engine,
     )
 
 
 def _upgrade_existing_database(cur, current_version, applied_delta_files,
-                               upgraded):
+                               upgraded, database_engine):
     """Upgrades an existing database.
 
     Delta files can either be SQL stored in *.sql files, or python modules
@@ -305,6 +308,8 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files,
     if not upgraded:
         start_ver += 1
 
+    logger.debug("applied_delta_files: %s", applied_delta_files)
+
     for v in range(start_ver, SCHEMA_VERSION + 1):
         logger.debug("Upgrading schema to v%d", v)
 
@@ -321,6 +326,7 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files,
         directory_entries.sort()
         for file_name in directory_entries:
             relative_path = os.path.join(str(v), file_name)
+            logger.debug("Found file: %s", relative_path)
             if relative_path in applied_delta_files:
                 continue
 
@@ -342,9 +348,8 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files,
                 module.run_upgrade(cur)
             elif ext == ".sql":
                 # A plain old .sql file, just read and execute it
-                delta_schema = read_schema(absolute_path)
                 logger.debug("Applying schema %s", relative_path)
-                cur.executescript(delta_schema)
+                executescript(cur, absolute_path)
             else:
                 # Not a valid delta file.
                 logger.warn(
@@ -356,24 +361,82 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files,
 
             # Mark as done.
             cur.execute(
-                "INSERT INTO applied_schema_deltas (version, file)"
-                " VALUES (?,?)",
+                database_engine.convert_param_style(
+                    "INSERT INTO applied_schema_deltas (version, file)"
+                    " VALUES (?,?)",
+                ),
                 (v, relative_path)
             )
 
             cur.execute(
-                "INSERT OR REPLACE INTO schema_version (version, upgraded)"
-                " VALUES (?,?)",
+                database_engine.convert_param_style(
+                    "REPLACE INTO schema_version (version, upgraded)"
+                    " VALUES (?,?)",
+                ),
                 (v, True)
             )
 
 
-def _get_or_create_schema_state(txn):
+def get_statements(f):
+    statement_buffer = ""
+    in_comment = False  # If we're in a /* ... */ style comment
+
+    for line in f:
+        line = line.strip()
+
+        if in_comment:
+            # Check if this line contains an end to the comment
+            comments = line.split("*/", 1)
+            if len(comments) == 1:
+                continue
+            line = comments[1]
+            in_comment = False
+
+        # Remove inline block comments
+        line = re.sub(r"/\*.*\*/", " ", line)
+
+        # Does this line start a comment?
+        comments = line.split("/*", 1)
+        if len(comments) > 1:
+            line = comments[0]
+            in_comment = True
+
+        # Deal with line comments
+        line = line.split("--", 1)[0]
+        line = line.split("//", 1)[0]
+
+        # Find *all* semicolons. We need to treat first and last entry
+        # specially.
+        statements = line.split(";")
+
+        # We must prepend statement_buffer to the first statement
+        first_statement = "%s %s" % (
+            statement_buffer.strip(),
+            statements[0].strip()
+        )
+        statements[0] = first_statement
+
+        # Every entry, except the last, is a full statement
+        for statement in statements[:-1]:
+            yield statement.strip()
+
+        # The last entry did *not* end in a semicolon, so we store it for the
+        # next semicolon we find
+        statement_buffer = statements[-1].strip()
+
+
+def executescript(txn, schema_path):
+    with open(schema_path, 'r') as f:
+        for statement in get_statements(f):
+            txn.execute(statement)
+
+
+def _get_or_create_schema_state(txn, database_engine):
+    # Bluntly try creating the schema_version tables.
     schema_path = os.path.join(
         dir_path, "schema", "schema_version.sql",
     )
-    create_schema = read_schema(schema_path)
-    txn.executescript(create_schema)
+    executescript(txn, schema_path)
 
     txn.execute("SELECT version, upgraded FROM schema_version")
     row = txn.fetchone()
@@ -382,10 +445,13 @@ def _get_or_create_schema_state(txn):
 
     if current_version:
         txn.execute(
-            "SELECT file FROM applied_schema_deltas WHERE version >= ?",
+            database_engine.convert_param_style(
+                "SELECT file FROM applied_schema_deltas WHERE version >= ?"
+            ),
             (current_version,)
         )
-        return current_version, txn.fetchall(), upgraded
+        applied_deltas = [d for d, in txn.fetchall()]
+        return current_version, applied_deltas, upgraded
 
     return None
 
@@ -417,7 +483,7 @@ def prepare_sqlite3_database(db_conn):
 
             if row and row[0]:
                 db_conn.execute(
-                    "INSERT OR REPLACE INTO schema_version (version, upgraded)"
+                    "REPLACE INTO schema_version (version, upgraded)"
                     " VALUES (?,?)",
                     (row[0], False)
                 )
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index e3e67d8e0d..f5952d1fc0 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -22,6 +22,8 @@ from synapse.util.logcontext import PreserveLoggingContext, LoggingContext
 from synapse.util.lrucache import LruCache
 import synapse.metrics
 
+from util.id_generators import IdGenerator, StreamIdGenerator
+
 from twisted.internet import defer
 
 from collections import namedtuple, OrderedDict
@@ -145,11 +147,12 @@ class LoggingTransaction(object):
     """An object that almost-transparently proxies for the 'txn' object
     passed to the constructor. Adds logging and metrics to the .execute()
     method."""
-    __slots__ = ["txn", "name"]
+    __slots__ = ["txn", "name", "database_engine"]
 
-    def __init__(self, txn, name):
+    def __init__(self, txn, name, database_engine):
         object.__setattr__(self, "txn", txn)
         object.__setattr__(self, "name", name)
+        object.__setattr__(self, "database_engine", database_engine)
 
     def __getattr__(self, name):
         return getattr(self.txn, name)
@@ -161,26 +164,32 @@ class LoggingTransaction(object):
         # TODO(paul): Maybe use 'info' and 'debug' for values?
         sql_logger.debug("[SQL] {%s} %s", self.name, sql)
 
-        try:
-            if args and args[0]:
-                values = args[0]
+        sql = self.database_engine.convert_param_style(sql)
+
+        if args and args[0]:
+            args = list(args)
+            args[0] = [
+                self.database_engine.encode_parameter(a) for a in args[0]
+            ]
+            try:
                 sql_logger.debug(
-                    "[SQL values] {%s} " + ", ".join(("<%r>",) * len(values)),
+                    "[SQL values] {%s} " + ", ".join(("<%r>",) * len(args[0])),
                     self.name,
-                    *values
+                    *args[0]
                 )
-        except:
-            # Don't let logging failures stop SQL from working
-            pass
+            except:
+                # Don't let logging failures stop SQL from working
+                pass
 
         start = time.time() * 1000
+
         try:
             return self.txn.execute(
                 sql, *args, **kwargs
             )
-        except:
-                logger.exception("[SQL FAIL] {%s}", self.name)
-                raise
+        except Exception as e:
+            logger.debug("[SQL FAIL] {%s} %s", self.name, e)
+            raise
         finally:
             msecs = (time.time() * 1000) - start
             sql_logger.debug("[SQL time] {%s} %f", self.name, msecs)
@@ -245,6 +254,14 @@ class SQLBaseStore(object):
         self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True,
                                       max_entries=hs.config.event_cache_size)
 
+        self.database_engine = hs.database_engine
+
+        self._stream_id_gen = StreamIdGenerator()
+        self._transaction_id_gen = IdGenerator("sent_transactions", "id", self)
+        self._state_groups_id_gen = IdGenerator("state_groups", "id", self)
+        self._access_tokens_id_gen = IdGenerator("access_tokens", "id", self)
+        self._pushers_id_gen = IdGenerator("pushers", "id", self)
+
     def start_profiling(self):
         self._previous_loop_ts = self._clock.time_msec()
 
@@ -281,7 +298,7 @@ class SQLBaseStore(object):
 
         start_time = time.time() * 1000
 
-        def inner_func(txn, *args, **kwargs):
+        def inner_func(conn, *args, **kwargs):
             with LoggingContext("runInteraction") as context:
                 current_context.copy_to(context)
                 start = time.time() * 1000
@@ -296,9 +313,25 @@ class SQLBaseStore(object):
                 sql_scheduling_timer.inc_by(time.time() * 1000 - start_time)
                 transaction_logger.debug("[TXN START] {%s}", name)
                 try:
-                    return func(LoggingTransaction(txn, name), *args, **kwargs)
-                except:
-                    logger.exception("[TXN FAIL] {%s}", name)
+                    i = 0
+                    N = 5
+                    while True:
+                        try:
+                            txn = conn.cursor()
+                            return func(
+                                LoggingTransaction(txn, name, self.database_engine),
+                                *args, **kwargs
+                            )
+                        except self.database_engine.module.DatabaseError as e:
+                            if self.database_engine.is_deadlock(e):
+                                logger.warn("[TXN DEADLOCK] {%s} %d/%d", name, i, N)
+                                if i < N:
+                                    i += 1
+                                    conn.rollback()
+                                    continue
+                            raise
+                except Exception as e:
+                    logger.debug("[TXN FAIL] {%s} %s", name, e)
                     raise
                 finally:
                     end = time.time() * 1000
@@ -311,7 +344,7 @@ class SQLBaseStore(object):
                     sql_txn_timer.inc_by(duration, desc)
 
         with PreserveLoggingContext():
-            result = yield self._db_pool.runInteraction(
+            result = yield self._db_pool.runWithConnection(
                 inner_func, *args, **kwargs
             )
         defer.returnValue(result)
@@ -342,11 +375,11 @@ class SQLBaseStore(object):
             The result of decoder(results)
         """
         def interaction(txn):
-            cursor = txn.execute(query, args)
+            txn.execute(query, args)
             if decoder:
-                return decoder(cursor)
+                return decoder(txn)
             else:
-                return cursor.fetchall()
+                return txn.fetchall()
 
         return self.runInteraction(desc, interaction)
 
@@ -356,27 +389,29 @@ class SQLBaseStore(object):
     # "Simple" SQL API methods that operate on a single table with no JOINs,
     # no complex WHERE clauses, just a dict of values for columns.
 
-    def _simple_insert(self, table, values, or_replace=False, or_ignore=False,
+    @defer.inlineCallbacks
+    def _simple_insert(self, table, values, or_ignore=False,
                        desc="_simple_insert"):
         """Executes an INSERT query on the named table.
 
         Args:
             table : string giving the table name
             values : dict of new column names and values for them
-            or_replace : bool; if True performs an INSERT OR REPLACE
         """
-        return self.runInteraction(
-            desc,
-            self._simple_insert_txn, table, values, or_replace=or_replace,
-            or_ignore=or_ignore,
-        )
+        try:
+            yield self.runInteraction(
+                desc,
+                self._simple_insert_txn, table, values,
+            )
+        except self.database_engine.module.IntegrityError:
+            # We have to do or_ignore flag at this layer, since we can't reuse
+            # a cursor after we receive an error from the db.
+            if not or_ignore:
+                raise
 
     @log_function
-    def _simple_insert_txn(self, txn, table, values, or_replace=False,
-                           or_ignore=False):
-        sql = "%s INTO %s (%s) VALUES(%s)" % (
-            ("INSERT OR REPLACE" if or_replace else
-             "INSERT OR IGNORE" if or_ignore else "INSERT"),
+    def _simple_insert_txn(self, txn, table, values):
+        sql = "INSERT INTO %s (%s) VALUES(%s)" % (
             table,
             ", ".join(k for k in values),
             ", ".join("?" for k in values)
@@ -388,22 +423,23 @@ class SQLBaseStore(object):
         )
 
         txn.execute(sql, values.values())
-        return txn.lastrowid
 
-    def _simple_upsert(self, table, keyvalues, values, desc="_simple_upsert"):
+    def _simple_upsert(self, table, keyvalues, values,
+                       insertion_values={}, desc="_simple_upsert"):
         """
         Args:
             table (str): The table to upsert into
             keyvalues (dict): The unique key tables and their new values
             values (dict): The nonunique columns and their new values
+            insertion_values (dict): key/values to use when inserting
         Returns: A deferred
         """
         return self.runInteraction(
             desc,
-            self._simple_upsert_txn, table, keyvalues, values
+            self._simple_upsert_txn, table, keyvalues, values, insertion_values,
         )
 
-    def _simple_upsert_txn(self, txn, table, keyvalues, values):
+    def _simple_upsert_txn(self, txn, table, keyvalues, values, insertion_values={}):
         # Try to update
         sql = "UPDATE %s SET %s WHERE %s" % (
             table,
@@ -422,6 +458,7 @@ class SQLBaseStore(object):
             allvalues = {}
             allvalues.update(keyvalues)
             allvalues.update(values)
+            allvalues.update(insertion_values)
 
             sql = "INSERT INTO %s (%s) VALUES (%s)" % (
                 table,
@@ -489,8 +526,7 @@ class SQLBaseStore(object):
 
     def _simple_select_onecol_txn(self, txn, table, keyvalues, retcol):
         sql = (
-            "SELECT %(retcol)s FROM %(table)s WHERE %(where)s "
-            "ORDER BY rowid asc"
+            "SELECT %(retcol)s FROM %(table)s WHERE %(where)s"
         ) % {
             "retcol": retcol,
             "table": table,
@@ -548,14 +584,14 @@ class SQLBaseStore(object):
             retcols : list of strings giving the names of the columns to return
         """
         if keyvalues:
-            sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % (
+            sql = "SELECT %s FROM %s WHERE %s" % (
                 ", ".join(retcols),
                 table,
                 " AND ".join("%s = ?" % (k, ) for k in keyvalues)
             )
             txn.execute(sql, keyvalues.values())
         else:
-            sql = "SELECT %s FROM %s ORDER BY rowid asc" % (
+            sql = "SELECT %s FROM %s" % (
                 ", ".join(retcols),
                 table
             )
@@ -607,10 +643,10 @@ class SQLBaseStore(object):
 
     def _simple_select_one_txn(self, txn, table, keyvalues, retcols,
                                allow_none=False):
-        select_sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % (
+        select_sql = "SELECT %s FROM %s WHERE %s" % (
             ", ".join(retcols),
             table,
-            " AND ".join("%s = ?" % (k) for k in keyvalues)
+            " AND ".join("%s = ?" % (k,) for k in keyvalues)
         )
 
         txn.execute(select_sql, keyvalues.values())
@@ -648,6 +684,11 @@ class SQLBaseStore(object):
                     updatevalues=updatevalues,
                 )
 
+                # if txn.rowcount == 0:
+                #     raise StoreError(404, "No row found")
+                if txn.rowcount > 1:
+                    raise StoreError(500, "More than one row matched")
+
             return ret
         return self.runInteraction(desc, func)
 
@@ -860,6 +901,12 @@ class SQLBaseStore(object):
         result = txn.fetchone()
         return result[0] if result else None
 
+    def get_next_stream_id(self):
+        with self._next_stream_id_lock:
+            i = self._next_stream_id
+            self._next_stream_id += 1
+            return i
+
 
 class _RollbackButIsFineException(Exception):
     """ This exception is used to rollback a transaction without implying
@@ -883,7 +930,7 @@ class Table(object):
 
     _select_where_clause = "SELECT %s FROM %s WHERE %s"
     _select_clause = "SELECT %s FROM %s"
-    _insert_clause = "INSERT OR REPLACE INTO %s (%s) VALUES (%s)"
+    _insert_clause = "REPLACE INTO %s (%s) VALUES (%s)"
 
     @classmethod
     def select_statement(cls, where_clause=None):
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index f8cbb3f323..63d1af4e86 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -366,11 +366,11 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
         new_txn_id = max(highest_txn_id, last_txn_id) + 1
 
         # Insert new txn into txn table
-        event_ids = [e.event_id for e in events]
+        event_ids = json.dumps([e.event_id for e in events])
         txn.execute(
             "INSERT INTO application_services_txns(as_id, txn_id, event_ids) "
             "VALUES(?,?,?)",
-            (service.id, new_txn_id, json.dumps(event_ids))
+            (service.id, new_txn_id, event_ids)
         )
         return AppServiceTransaction(
             service=service, id=new_txn_id, events=events
diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py
index 0199539fea..2b2bdf8615 100644
--- a/synapse/storage/directory.py
+++ b/synapse/storage/directory.py
@@ -21,8 +21,6 @@ from twisted.internet import defer
 
 from collections import namedtuple
 
-import sqlite3
-
 
 RoomAliasMapping = namedtuple(
     "RoomAliasMapping",
@@ -91,7 +89,7 @@ class DirectoryStore(SQLBaseStore):
                 },
                 desc="create_room_alias_association",
             )
-        except sqlite3.IntegrityError:
+        except self.database_engine.module.IntegrityError:
             raise SynapseError(
                 409, "Room alias %s already exists" % room_alias.to_string()
             )
@@ -120,12 +118,12 @@ class DirectoryStore(SQLBaseStore):
         defer.returnValue(room_id)
 
     def _delete_room_alias_txn(self, txn, room_alias):
-        cursor = txn.execute(
+        txn.execute(
             "SELECT room_id FROM room_aliases WHERE room_alias = ?",
             (room_alias.to_string(),)
         )
 
-        res = cursor.fetchone()
+        res = txn.fetchone()
         if res:
             room_id = res[0]
         else:
diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py
new file mode 100644
index 0000000000..eb76df7f01
--- /dev/null
+++ b/synapse/storage/engines/__init__.py
@@ -0,0 +1,37 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from .postgres import PostgresEngine
+from .sqlite3 import Sqlite3Engine
+
+import importlib
+
+
+SUPPORTED_MODULE = {
+    "sqlite3": Sqlite3Engine,
+    "psycopg2": PostgresEngine,
+}
+
+
+def create_engine(name):
+    engine_class = SUPPORTED_MODULE.get(name, None)
+
+    if engine_class:
+        module = importlib.import_module(name)
+        return engine_class(module)
+
+    raise RuntimeError(
+        "Unsupported database engine '%s'" % (name,)
+    )
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
new file mode 100644
index 0000000000..6f75245fa7
--- /dev/null
+++ b/synapse/storage/engines/postgres.py
@@ -0,0 +1,41 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.storage import prepare_database
+
+
+class PostgresEngine(object):
+    def __init__(self, database_module):
+        self.module = database_module
+        self.module.extensions.register_type(self.module.extensions.UNICODE)
+
+    def convert_param_style(self, sql):
+        return sql.replace("?", "%s")
+
+    def encode_parameter(self, param):
+        return param
+
+    def on_new_connection(self, db_conn):
+        db_conn.set_isolation_level(
+            self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ
+        )
+
+    def prepare_database(self, db_conn):
+        prepare_database(db_conn, self)
+
+    def is_deadlock(self, error):
+        if isinstance(error, self.module.DatabaseError):
+            return error.pgcode in ["40001", "40P01"]
+        return False
diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite3.py
new file mode 100644
index 0000000000..72c11df461
--- /dev/null
+++ b/synapse/storage/engines/sqlite3.py
@@ -0,0 +1,37 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.storage import prepare_database, prepare_sqlite3_database
+
+
+class Sqlite3Engine(object):
+    def __init__(self, database_module):
+        self.module = database_module
+
+    def convert_param_style(self, sql):
+        return sql
+
+    def encode_parameter(self, param):
+        return param
+
+    def on_new_connection(self, db_conn):
+        self.prepare_database(db_conn)
+
+    def prepare_database(self, db_conn):
+        prepare_sqlite3_database(db_conn)
+        prepare_database(db_conn, self)
+
+    def is_deadlock(self, error):
+        return False
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 032334bfd6..54a3c9d805 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -153,7 +153,7 @@ class EventFederationStore(SQLBaseStore):
         results = self._get_prev_events_and_state(
             txn,
             event_id,
-            is_state=1,
+            is_state=True,
         )
 
         return [(e_id, h, ) for e_id, h, _ in results]
@@ -164,7 +164,7 @@ class EventFederationStore(SQLBaseStore):
         }
 
         if is_state is not None:
-            keyvalues["is_state"] = is_state
+            keyvalues["is_state"] = bool(is_state)
 
         res = self._simple_select_list_txn(
             txn,
@@ -242,7 +242,6 @@ class EventFederationStore(SQLBaseStore):
                     "room_id": room_id,
                     "min_depth": depth,
                 },
-                or_replace=True,
             )
 
     def _handle_prev_events(self, txn, outlier, event_id, prev_events,
@@ -260,9 +259,8 @@ class EventFederationStore(SQLBaseStore):
                     "event_id": event_id,
                     "prev_event_id": e_id,
                     "room_id": room_id,
-                    "is_state": 0,
+                    "is_state": False,
                 },
-                or_ignore=True,
             )
 
         # Update the extremities table if this is not an outlier.
@@ -281,19 +279,19 @@ class EventFederationStore(SQLBaseStore):
             # We only insert as a forward extremity the new event if there are
             # no other events that reference it as a prev event
             query = (
-                "INSERT OR IGNORE INTO %(table)s (event_id, room_id) "
-                "SELECT ?, ? WHERE NOT EXISTS ("
-                "SELECT 1 FROM %(event_edges)s WHERE "
-                "prev_event_id = ? "
-                ")"
-            ) % {
-                "table": "event_forward_extremities",
-                "event_edges": "event_edges",
-            }
+                "SELECT 1 FROM event_edges WHERE prev_event_id = ?"
+            )
 
-            logger.debug("query: %s", query)
+            txn.execute(query, (event_id,))
 
-            txn.execute(query, (event_id, room_id, event_id))
+            if not txn.fetchone():
+                query = (
+                    "INSERT INTO event_forward_extremities"
+                    " (event_id, room_id)"
+                    " VALUES (?, ?)"
+                )
+
+                txn.execute(query, (event_id, room_id))
 
             # Insert all the prev_events as a backwards thing, they'll get
             # deleted in a second if they're incorrect anyway.
@@ -306,7 +304,6 @@ class EventFederationStore(SQLBaseStore):
                         "event_id": e_id,
                         "room_id": room_id,
                     },
-                    or_ignore=True,
                 )
 
             # Also delete from the backwards extremities table all ones that
@@ -400,7 +397,7 @@ class EventFederationStore(SQLBaseStore):
 
         query = (
             "SELECT prev_event_id FROM event_edges "
-            "WHERE room_id = ? AND event_id = ? AND is_state = 0 "
+            "WHERE room_id = ? AND event_id = ? AND is_state = ? "
             "LIMIT ?"
         )
 
@@ -409,7 +406,7 @@ class EventFederationStore(SQLBaseStore):
             for event_id in front:
                 txn.execute(
                     query,
-                    (room_id, event_id, limit - len(event_results))
+                    (room_id, event_id, False, limit - len(event_results))
                 )
 
                 for e_id, in txn.fetchall():
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 2425f57f5f..7dbf7a396a 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -52,7 +52,6 @@ class EventsStore(SQLBaseStore):
                 is_new_state=is_new_state,
                 current_state=current_state,
             )
-            self.get_room_events_max_id.invalidate()
         except _RollbackButIsFineException:
             pass
 
@@ -96,12 +95,22 @@ class EventsStore(SQLBaseStore):
         # Remove the any existing cache entries for the event_id
         self._invalidate_get_event_cache(event.event_id)
 
+        if stream_ordering is None:
+            with self._stream_id_gen.get_next_txn(txn) as stream_ordering:
+                return self._persist_event_txn(
+                    txn, event, context, backfilled,
+                    stream_ordering=stream_ordering,
+                    is_new_state=is_new_state,
+                    current_state=current_state,
+                )
+
         # We purposefully do this first since if we include a `current_state`
         # key, we *want* to update the `current_state_events` table
         if current_state:
-            txn.execute(
-                "DELETE FROM current_state_events WHERE room_id = ?",
-                (event.room_id,)
+            self._simple_delete_txn(
+                txn,
+                table="current_state_events",
+                keyvalues={"room_id": event.room_id},
             )
 
             for s in current_state:
@@ -114,7 +123,6 @@ class EventsStore(SQLBaseStore):
                         "type": s.type,
                         "state_key": s.state_key,
                     },
-                    or_replace=True,
                 )
 
         if event.is_state() and is_new_state:
@@ -128,7 +136,6 @@ class EventsStore(SQLBaseStore):
                         "type": event.type,
                         "state_key": event.state_key,
                     },
-                    or_replace=True,
                 )
 
                 for prev_state_id, _ in event.prev_state:
@@ -151,14 +158,6 @@ class EventsStore(SQLBaseStore):
                 event.depth
             )
 
-        self._handle_prev_events(
-            txn,
-            outlier=outlier,
-            event_id=event.event_id,
-            prev_events=event.prev_events,
-            room_id=event.room_id,
-        )
-
         have_persisted = self._simple_select_one_onecol_txn(
             txn,
             table="event_json",
@@ -169,7 +168,7 @@ class EventsStore(SQLBaseStore):
 
         metadata_json = encode_canonical_json(
             event.internal_metadata.get_dict()
-        )
+        ).decode("UTF-8")
 
         # If we have already persisted this event, we don't need to do any
         # more processing.
@@ -185,23 +184,29 @@ class EventsStore(SQLBaseStore):
                 )
                 txn.execute(
                     sql,
-                    (metadata_json.decode("UTF-8"), event.event_id,)
+                    (metadata_json, event.event_id,)
                 )
 
                 sql = (
-                    "UPDATE events SET outlier = 0"
+                    "UPDATE events SET outlier = ?"
                     " WHERE event_id = ?"
                 )
                 txn.execute(
                     sql,
-                    (event.event_id,)
+                    (False, event.event_id,)
                 )
             return
 
+        self._handle_prev_events(
+            txn,
+            outlier=outlier,
+            event_id=event.event_id,
+            prev_events=event.prev_events,
+            room_id=event.room_id,
+        )
+
         if event.type == EventTypes.Member:
             self._store_room_member_txn(txn, event)
-        elif event.type == EventTypes.Feedback:
-            self._store_feedback_txn(txn, event)
         elif event.type == EventTypes.Name:
             self._store_room_name_txn(txn, event)
         elif event.type == EventTypes.Topic:
@@ -224,10 +229,9 @@ class EventsStore(SQLBaseStore):
             values={
                 "event_id": event.event_id,
                 "room_id": event.room_id,
-                "internal_metadata": metadata_json.decode("UTF-8"),
+                "internal_metadata": metadata_json,
                 "json": encode_canonical_json(event_dict).decode("UTF-8"),
             },
-            or_replace=True,
         )
 
         content = encode_canonical_json(
@@ -245,9 +249,6 @@ class EventsStore(SQLBaseStore):
             "depth": event.depth,
         }
 
-        if stream_ordering is not None:
-            vals["stream_ordering"] = stream_ordering
-
         unrec = {
             k: v
             for k, v in event.get_dict().items()
@@ -264,21 +265,20 @@ class EventsStore(SQLBaseStore):
             unrec
         ).decode("UTF-8")
 
-        try:
-            self._simple_insert_txn(
-                txn,
-                "events",
-                vals,
-                or_replace=(not outlier),
-                or_ignore=bool(outlier),
-            )
-        except:
-            logger.warn(
-                "Failed to persist, probably duplicate: %s",
-                event.event_id,
-                exc_info=True,
+        sql = (
+            "INSERT INTO events"
+            " (stream_ordering, topological_ordering, event_id, type,"
+            " room_id, content, processed, outlier, depth)"
+            " VALUES (?,?,?,?,?,?,?,?,?)"
+        )
+
+        txn.execute(
+            sql,
+            (
+                stream_ordering, event.depth, event.event_id, event.type,
+                event.room_id, content, True, outlier, event.depth
             )
-            raise _RollbackButIsFineException("_persist_event")
+        )
 
         if context.rejected:
             self._store_rejections_txn(txn, event.event_id, context.rejected)
@@ -302,15 +302,17 @@ class EventsStore(SQLBaseStore):
             )
 
             if is_new_state and not context.rejected:
-                self._simple_insert_txn(
+                self._simple_upsert_txn(
                     txn,
                     "current_state_events",
-                    {
-                        "event_id": event.event_id,
+                    keyvalues={
                         "room_id": event.room_id,
                         "type": event.type,
                         "state_key": event.state_key,
                     },
+                    values={
+                        "event_id": event.event_id,
+                    }
                 )
 
             for e_id, h in event.prev_state:
@@ -321,7 +323,7 @@ class EventsStore(SQLBaseStore):
                         "event_id": event.event_id,
                         "prev_event_id": e_id,
                         "room_id": event.room_id,
-                        "is_state": 1,
+                        "is_state": True,
                     },
                 )
 
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index 09d1e63657..d3b9b38664 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -57,16 +57,18 @@ class KeyStore(SQLBaseStore):
             OpenSSL.crypto.FILETYPE_ASN1, tls_certificate
         )
         fingerprint = hashlib.sha256(tls_certificate_bytes).hexdigest()
-        return self._simple_insert(
+        return self._simple_upsert(
             table="server_tls_certificates",
-            values={
+            keyvalues={
                 "server_name": server_name,
                 "fingerprint": fingerprint,
+            },
+            values={
                 "from_server": from_server,
                 "ts_added_ms": time_now_ms,
                 "tls_certificate": buffer(tls_certificate_bytes),
             },
-            or_ignore=True,
+            desc="store_server_certificate",
         )
 
     @defer.inlineCallbacks
@@ -107,14 +109,16 @@ class KeyStore(SQLBaseStore):
             ts_now_ms (int): The time now in milliseconds
             verification_key (VerifyKey): The NACL verify key.
         """
-        return self._simple_insert(
+        return self._simple_upsert(
             table="server_signature_keys",
-            values={
+            keyvalues={
                 "server_name": server_name,
                 "key_id": "%s:%s" % (verify_key.alg, verify_key.version),
+            },
+            values={
                 "from_server": from_server,
                 "ts_added_ms": time_now_ms,
                 "verify_key": buffer(verify_key.encode()),
             },
-            or_ignore=True,
+            desc="store_server_verify_key",
         )
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index 87fba55439..22ec94bc16 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -57,6 +57,7 @@ class PresenceStore(SQLBaseStore):
             values={"observed_user_id": observed_localpart,
                     "observer_user_id": observer_userid},
             desc="allow_presence_visible",
+            or_ignore=True,
         )
 
     def disallow_presence_visible(self, observed_localpart, observer_userid):
diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py
index a6e52cb248..047698aa13 100644
--- a/synapse/storage/profile.py
+++ b/synapse/storage/profile.py
@@ -13,6 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from twisted.internet import defer
+
 from ._base import SQLBaseStore
 
 
@@ -24,14 +26,17 @@ class ProfileStore(SQLBaseStore):
             desc="create_profile",
         )
 
+    @defer.inlineCallbacks
     def get_profile_displayname(self, user_localpart):
-        return self._simple_select_one_onecol(
+        name = yield self._simple_select_one_onecol(
             table="profiles",
             keyvalues={"user_id": user_localpart},
             retcol="displayname",
             desc="get_profile_displayname",
         )
 
+        defer.returnValue(name)
+
     def set_profile_displayname(self, user_localpart, new_displayname):
         return self._simple_update_one(
             table="profiles",
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index c47bdc2861..ee7718d5ed 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -154,7 +154,7 @@ class PushRuleStore(SQLBaseStore):
             txn.execute(sql, (user_name, priority_class, new_rule_priority))
 
         # now insert the new rule
-        sql = "INSERT OR REPLACE INTO "+PushRuleTable.table_name+" ("
+        sql = "INSERT INTO "+PushRuleTable.table_name+" ("
         sql += ",".join(new_rule.keys())+") VALUES ("
         sql += ", ".join(["?" for _ in new_rule.keys()])+")"
 
@@ -183,7 +183,7 @@ class PushRuleStore(SQLBaseStore):
         new_rule['priority_class'] = priority_class
         new_rule['priority'] = new_prio
 
-        sql = "INSERT OR REPLACE INTO "+PushRuleTable.table_name+" ("
+        sql = "INSERT INTO "+PushRuleTable.table_name+" ("
         sql += ",".join(new_rule.keys())+") VALUES ("
         sql += ", ".join(["?" for _ in new_rule.keys()])+")"
 
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 000502b4ff..a44bccdca6 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -13,14 +13,15 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import collections
-
 from ._base import SQLBaseStore, Table
 from twisted.internet import defer
 
 from synapse.api.errors import StoreError
 
+from syutil.jsonutil import encode_canonical_json
+
 import logging
+import simplejson as json
 
 logger = logging.getLogger(__name__)
 
@@ -52,7 +53,7 @@ class PusherStore(SQLBaseStore):
                 "device_display_name": r[6],
                 "pushkey": r[7],
                 "pushkey_ts": r[8],
-                "data": r[9],
+                "data": json.loads(r[9]),
                 "last_token": r[10],
                 "last_success": r[11],
                 "failing_since": r[12]
@@ -84,7 +85,7 @@ class PusherStore(SQLBaseStore):
                 "device_display_name": r[6],
                 "pushkey": r[7],
                 "pushkey_ts": r[8],
-                "data": r[9],
+                "data": json.loads(r[9]),
                 "last_token": r[10],
                 "last_success": r[11],
                 "failing_since": r[12]
@@ -99,6 +100,7 @@ class PusherStore(SQLBaseStore):
                    app_display_name, device_display_name,
                    pushkey, pushkey_ts, lang, data):
         try:
+            next_id = yield self._pushers_id_gen.get_next()
             yield self._simple_upsert(
                 PushersTable.table_name,
                 dict(
@@ -113,7 +115,10 @@ class PusherStore(SQLBaseStore):
                     device_display_name=device_display_name,
                     ts=pushkey_ts,
                     lang=lang,
-                    data=data
+                    data=encode_canonical_json(data).decode("UTF-8"),
+                ),
+                insertion_values=dict(
+                    id=next_id,
                 ),
                 desc="add_pusher",
             )
@@ -160,21 +165,3 @@ class PusherStore(SQLBaseStore):
 
 class PushersTable(Table):
     table_name = "pushers"
-
-    fields = [
-        "id",
-        "user_name",
-        "kind",
-        "profile_tag",
-        "app_id",
-        "app_display_name",
-        "device_display_name",
-        "pushkey",
-        "pushkey_ts",
-        "data",
-        "last_token",
-        "last_success",
-        "failing_since"
-    ]
-
-    EntryType = collections.namedtuple("PusherEntry", fields)
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index f24154f146..2a5c5080e4 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -15,8 +15,6 @@
 
 from twisted.internet import defer
 
-from sqlite3 import IntegrityError
-
 from synapse.api.errors import StoreError, Codes
 
 from ._base import SQLBaseStore, cached
@@ -39,17 +37,13 @@ class RegistrationStore(SQLBaseStore):
         Raises:
             StoreError if there was a problem adding this.
         """
-        row = yield self._simple_select_one(
-            "users", {"name": user_id}, ["id"],
-            desc="add_access_token_to_user",
-        )
-        if not row:
-            raise StoreError(400, "Bad user ID supplied.")
-        row_id = row["id"]
+        next_id = yield self._access_tokens_id_gen.get_next()
+
         yield self._simple_insert(
             "access_tokens",
             {
-                "user_id": row_id,
+                "id": next_id,
+                "user_id": user_id,
                 "token": token
             },
             desc="add_access_token_to_user",
@@ -74,27 +68,38 @@ class RegistrationStore(SQLBaseStore):
     def _register(self, txn, user_id, token, password_hash):
         now = int(self.clock.time())
 
+        next_id = self._access_tokens_id_gen.get_next_txn(txn)
+
         try:
             txn.execute("INSERT INTO users(name, password_hash, creation_ts) "
                         "VALUES (?,?,?)",
                         [user_id, password_hash, now])
-        except IntegrityError:
+        except self.database_engine.module.IntegrityError:
             raise StoreError(
                 400, "User ID already taken.", errcode=Codes.USER_IN_USE
             )
 
         # it's possible for this to get a conflict, but only for a single user
         # since tokens are namespaced based on their user ID
-        txn.execute("INSERT INTO access_tokens(user_id, token) " +
-                    "VALUES (?,?)", [txn.lastrowid, token])
+        txn.execute(
+            "INSERT INTO access_tokens(id, user_id, token)"
+            " VALUES (?,?,?)",
+            (next_id, user_id, token,)
+        )
 
+    @defer.inlineCallbacks
     def get_user_by_id(self, user_id):
-        query = ("SELECT users.name, users.password_hash FROM users"
-                 " WHERE users.name = ?")
-        return self._execute(
-            "get_user_by_id", self.cursor_to_dict, query, user_id
+        user_info = yield self._simple_select_one(
+            table="users",
+            keyvalues={
+                "name": user_id,
+            },
+            retcols=["name", "password_hash"],
+            allow_none=True,
         )
 
+        defer.returnValue(user_info)
+
     @cached()
     # TODO(paul): Currently there's no code to invalidate this cache. That
     #   means if/when we ever add internal ways to invalidate access tokens or
@@ -134,12 +139,12 @@ class RegistrationStore(SQLBaseStore):
             "SELECT users.name, users.admin,"
             " access_tokens.device_id, access_tokens.id as token_id"
             " FROM users"
-            " INNER JOIN access_tokens on users.id = access_tokens.user_id"
+            " INNER JOIN access_tokens on users.name = access_tokens.user_id"
             " WHERE token = ?"
         )
 
-        cursor = txn.execute(sql, (token,))
-        rows = self.cursor_to_dict(cursor)
+        txn.execute(sql, (token,))
+        rows = self.cursor_to_dict(txn)
         if rows:
             return rows[0]
 
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index be3e28c2ea..48ebb33057 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -72,6 +72,7 @@ class RoomStore(SQLBaseStore):
             keyvalues={"room_id": room_id},
             retcols=RoomsTable.fields,
             desc="get_room",
+            allow_none=True,
         )
 
     @defer.inlineCallbacks
@@ -102,24 +103,37 @@ class RoomStore(SQLBaseStore):
                 "ON c.event_id = room_names.event_id "
             )
 
-            # We use non printing ascii character US () as a seperator
+            # We use non printing ascii character US (\x1F) as a separator
             sql = (
-                "SELECT r.room_id, n.name, t.topic, "
-                "group_concat(a.room_alias, '') "
-                "FROM rooms AS r "
-                "LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id "
-                "LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id "
-                "INNER JOIN room_aliases AS a ON a.room_id = r.room_id "
-                "WHERE r.is_public = ? "
-                "GROUP BY r.room_id "
+                "SELECT r.room_id, max(n.name), max(t.topic)"
+                " FROM rooms AS r"
+                " LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id"
+                " LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id"
+                " WHERE r.is_public = ?"
+                " GROUP BY r.room_id"
             ) % {
                 "topic": topic_subquery,
                 "name": name_subquery,
             }
 
-            c = txn.execute(sql, (is_public,))
+            txn.execute(sql, (is_public,))
 
-            return c.fetchall()
+            rows = txn.fetchall()
+
+            for i, row in enumerate(rows):
+                room_id = row[0]
+                aliases = self._simple_select_onecol_txn(
+                    txn,
+                    table="room_aliases",
+                    keyvalues={
+                        "room_id": room_id
+                    },
+                    retcol="room_alias",
+                )
+
+                rows[i] = list(row) + [aliases]
+
+            return rows
 
         rows = yield self.runInteraction(
             "get_rooms", f
@@ -130,9 +144,10 @@ class RoomStore(SQLBaseStore):
                 "room_id": r[0],
                 "name": r[1],
                 "topic": r[2],
-                "aliases": r[3].split(""),
+                "aliases": r[3],
             }
             for r in rows
+            if r[3]  # We only return rooms that have at least one alias.
         ]
 
         defer.returnValue(ret)
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 52c37c76f5..8ea5756d61 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -40,7 +40,6 @@ class RoomMemberStore(SQLBaseStore):
         """
         try:
             target_user_id = event.state_key
-            domain = UserID.from_string(target_user_id).domain
         except:
             logger.exception(
                 "Failed to parse target_user_id=%s", target_user_id
@@ -65,42 +64,8 @@ class RoomMemberStore(SQLBaseStore):
             }
         )
 
-        # Update room hosts table
-        if event.membership == Membership.JOIN:
-            sql = (
-                "INSERT OR IGNORE INTO room_hosts (room_id, host) "
-                "VALUES (?, ?)"
-            )
-            txn.execute(sql, (event.room_id, domain))
-        elif event.membership != Membership.INVITE:
-            # Check if this was the last person to have left.
-            member_events = self._get_members_query_txn(
-                txn,
-                where_clause=("c.room_id = ? AND m.membership = ?"
-                              " AND m.user_id != ?"),
-                where_values=(event.room_id, Membership.JOIN, target_user_id,)
-            )
-
-            joined_domains = set()
-            for e in member_events:
-                try:
-                    joined_domains.add(
-                        UserID.from_string(e.state_key).domain
-                    )
-                except:
-                    # FIXME: How do we deal with invalid user ids in the db?
-                    logger.exception("Invalid user_id: %s", event.state_key)
-
-            if domain not in joined_domains:
-                sql = (
-                    "DELETE FROM room_hosts WHERE room_id = ? AND host = ?"
-                )
-
-                txn.execute(sql, (event.room_id, domain))
-
         self.get_rooms_for_user.invalidate(target_user_id)
 
-    @defer.inlineCallbacks
     def get_room_member(self, user_id, room_id):
         """Retrieve the current state of a room member.
 
@@ -110,41 +75,27 @@ class RoomMemberStore(SQLBaseStore):
         Returns:
             Deferred: Results in a MembershipEvent or None.
         """
-        rows = yield self._get_members_by_dict({
-            "e.room_id": room_id,
-            "m.user_id": user_id,
-        })
+        def f(txn):
+            events = self._get_members_events_txn(
+                txn,
+                room_id,
+                user_id=user_id,
+            )
 
-        defer.returnValue(rows[0] if rows else None)
+            return events[0] if events else None
 
-    def _get_room_member(self, txn, user_id, room_id):
-        sql = (
-            "SELECT e.* FROM events as e"
-            " INNER JOIN room_memberships as m"
-            " ON e.event_id = m.event_id"
-            " INNER JOIN current_state_events as c"
-            " ON m.event_id = c.event_id"
-            " WHERE m.user_id = ? and e.room_id = ?"
-            " LIMIT 1"
-        )
-        txn.execute(sql, (user_id, room_id))
-        rows = self.cursor_to_dict(txn)
-        if rows:
-            return self._parse_events_txn(txn, rows)[0]
-        else:
-            return None
+        return self.runInteraction("get_room_member", f)
 
     def get_users_in_room(self, room_id):
         def f(txn):
-            sql = (
-                "SELECT m.user_id FROM room_memberships as m"
-                " INNER JOIN current_state_events as c"
-                " ON m.event_id = c.event_id"
-                " WHERE m.membership = ? AND m.room_id = ?"
+
+            rows = self._get_members_rows_txn(
+                txn,
+                room_id=room_id,
+                membership=Membership.JOIN,
             )
 
-            txn.execute(sql, (Membership.JOIN, room_id))
-            return [r[0] for r in txn.fetchall()]
+            return [r["user_id"] for r in rows]
         return self.runInteraction("get_users_in_room", f)
 
     def get_room_members(self, room_id, membership=None):
@@ -159,11 +110,14 @@ class RoomMemberStore(SQLBaseStore):
             list of namedtuples representing the members in this room.
         """
 
-        where = {"m.room_id": room_id}
-        if membership:
-            where["m.membership"] = membership
+        def f(txn):
+            return self._get_members_events_txn(
+                txn,
+                room_id,
+                membership=membership,
+            )
 
-        return self._get_members_by_dict(where)
+        return self.runInteraction("get_room_members", f)
 
     def get_rooms_for_user_where_membership_is(self, user_id, membership_list):
         """ Get all the rooms for this user where the membership for this user
@@ -209,32 +163,55 @@ class RoomMemberStore(SQLBaseStore):
         ]
 
     def get_joined_hosts_for_room(self, room_id):
-        return self._simple_select_onecol(
-            "room_hosts",
-            {"room_id": room_id},
-            "host",
-            desc="get_joined_hosts_for_room",
+        return self.runInteraction(
+            "get_joined_hosts_for_room",
+            self._get_joined_hosts_for_room_txn,
+            room_id,
+        )
+
+    def _get_joined_hosts_for_room_txn(self, txn, room_id):
+        rows = self._get_members_rows_txn(
+            txn,
+            room_id, membership=Membership.JOIN
+        )
+
+        joined_domains = set(
+            UserID.from_string(r["user_id"]).domain
+            for r in rows
         )
 
-    def _get_members_by_dict(self, where_dict):
-        clause = " AND ".join("%s = ?" % k for k in where_dict.keys())
-        vals = where_dict.values()
-        return self._get_members_query(clause, vals)
+        return joined_domains
 
     def _get_members_query(self, where_clause, where_values):
         return self.runInteraction(
-            "get_members_query", self._get_members_query_txn,
+            "get_members_query", self._get_members_events_txn,
             where_clause, where_values
         )
 
-    def _get_members_query_txn(self, txn, where_clause, where_values):
+    def _get_members_events_txn(self, txn, room_id, membership=None, user_id=None):
+        rows = self._get_members_rows_txn(
+            txn,
+            room_id, membership, user_id,
+        )
+        return self._get_events_txn(txn, [r["event_id"] for r in rows])
+
+    def _get_members_rows_txn(self, txn, room_id, membership=None, user_id=None):
+        where_clause = "c.room_id = ?"
+        where_values = [room_id]
+
+        if membership:
+            where_clause += " AND m.membership = ?"
+            where_values.append(membership)
+
+        if user_id:
+            where_clause += " AND m.user_id = ?"
+            where_values.append(user_id)
+
         sql = (
-            "SELECT e.* FROM events as e "
-            "INNER JOIN room_memberships as m "
-            "ON e.event_id = m.event_id "
-            "INNER JOIN current_state_events as c "
-            "ON m.event_id = c.event_id "
-            "WHERE %(where)s "
+            "SELECT m.* FROM room_memberships as m"
+            " INNER JOIN current_state_events as c"
+            " ON m.event_id = c.event_id"
+            " WHERE %(where)s"
         ) % {
             "where": where_clause,
         }
@@ -242,8 +219,7 @@ class RoomMemberStore(SQLBaseStore):
         txn.execute(sql, where_values)
         rows = self.cursor_to_dict(txn)
 
-        results = self._parse_events_txn(txn, rows)
-        return results
+        return rows
 
     @cached()
     def get_rooms_for_user(self, user_id):
diff --git a/synapse/storage/schema/delta/12/v12.sql b/synapse/storage/schema/delta/12/v12.sql
index b87ef1fe79..a246943f5a 100644
--- a/synapse/storage/schema/delta/12/v12.sql
+++ b/synapse/storage/schema/delta/12/v12.sql
@@ -14,54 +14,50 @@
  */
 
 CREATE TABLE IF NOT EXISTS rejections(
-    event_id TEXT NOT NULL,
-    reason TEXT NOT NULL,
-    last_check TEXT NOT NULL,
-    CONSTRAINT ev_id UNIQUE (event_id) ON CONFLICT REPLACE
+    event_id VARCHAR(150) NOT NULL,
+    reason VARCHAR(150) NOT NULL,
+    last_check VARCHAR(150) NOT NULL,
+    UNIQUE (event_id)
 );
 
 -- Push notification endpoints that users have configured
 CREATE TABLE IF NOT EXISTS pushers (
   id INTEGER PRIMARY KEY AUTOINCREMENT,
-  user_name TEXT NOT NULL,
-  profile_tag varchar(32) NOT NULL,
-  kind varchar(8) NOT NULL,
-  app_id varchar(64) NOT NULL,
-  app_display_name varchar(64) NOT NULL,
-  device_display_name varchar(128) NOT NULL,
-  pushkey blob NOT NULL,
-  ts BIGINT NOT NULL,
-  lang varchar(8),
-  data blob,
+  user_name VARCHAR(150) NOT NULL,
+  profile_tag VARCHAR(32) NOT NULL,
+  kind VARCHAR(8) NOT NULL,
+  app_id VARCHAR(64) NOT NULL,
+  app_display_name VARCHAR(64) NOT NULL,
+  device_display_name VARCHAR(128) NOT NULL,
+  pushkey VARBINARY(512) NOT NULL,
+  ts BIGINT UNSIGNED NOT NULL,
+  lang VARCHAR(8),
+  data LONGBLOB,
   last_token TEXT,
-  last_success BIGINT,
-  failing_since BIGINT,
-  FOREIGN KEY(user_name) REFERENCES users(name),
+  last_success BIGINT UNSIGNED,
+  failing_since BIGINT UNSIGNED,
   UNIQUE (app_id, pushkey)
 );
 
 CREATE TABLE IF NOT EXISTS push_rules (
   id INTEGER PRIMARY KEY AUTOINCREMENT,
-  user_name TEXT NOT NULL,
-  rule_id TEXT NOT NULL,
+  user_name VARCHAR(150) NOT NULL,
+  rule_id VARCHAR(150) NOT NULL,
   priority_class TINYINT NOT NULL,
   priority INTEGER NOT NULL DEFAULT 0,
-  conditions TEXT NOT NULL,
-  actions TEXT NOT NULL,
+  conditions VARCHAR(150) NOT NULL,
+  actions VARCHAR(150) NOT NULL,
   UNIQUE(user_name, rule_id)
 );
 
 CREATE INDEX IF NOT EXISTS push_rules_user_name on push_rules (user_name);
 
 CREATE TABLE IF NOT EXISTS user_filters(
-  user_id TEXT,
-  filter_id INTEGER,
-  filter_json TEXT,
-  FOREIGN KEY(user_id) REFERENCES users(id)
+  user_id VARCHAR(150),
+  filter_id BIGINT UNSIGNED,
+  filter_json LONGBLOB
 );
 
 CREATE INDEX IF NOT EXISTS user_filters_by_user_id_filter_id ON user_filters(
-  user_id, filter_id
+    user_id, filter_id
 );
-
-PRAGMA user_version = 12;
diff --git a/synapse/storage/schema/delta/13/v13.sql b/synapse/storage/schema/delta/13/v13.sql
index e491ad5aec..d1da2b48e2 100644
--- a/synapse/storage/schema/delta/13/v13.sql
+++ b/synapse/storage/schema/delta/13/v13.sql
@@ -15,20 +15,17 @@
 
 CREATE TABLE IF NOT EXISTS application_services(
     id INTEGER PRIMARY KEY AUTOINCREMENT,
-    url TEXT,
-    token TEXT,
-    hs_token TEXT,
-    sender TEXT,
-    UNIQUE(token) ON CONFLICT ROLLBACK
+    url VARCHAR(150),
+    token VARCHAR(150),
+    hs_token VARCHAR(150),
+    sender VARCHAR(150),
+    UNIQUE(token)
 );
 
 CREATE TABLE IF NOT EXISTS application_services_regex(
     id INTEGER PRIMARY KEY AUTOINCREMENT,
-    as_id INTEGER NOT NULL,
+    as_id BIGINT UNSIGNED NOT NULL,
     namespace INTEGER,  /* enum[room_id|room_alias|user_id] */
-    regex TEXT,
+    regex VARCHAR(150),
     FOREIGN KEY(as_id) REFERENCES application_services(id)
 );
-
-
-
diff --git a/synapse/storage/schema/delta/14/v14.sql b/synapse/storage/schema/delta/14/v14.sql
index 0212726448..8c47d4b0f4 100644
--- a/synapse/storage/schema/delta/14/v14.sql
+++ b/synapse/storage/schema/delta/14/v14.sql
@@ -1,7 +1,7 @@
 CREATE TABLE IF NOT EXISTS push_rules_enable (
   id INTEGER PRIMARY KEY AUTOINCREMENT,
-  user_name TEXT NOT NULL,
-  rule_id TEXT NOT NULL,
+  user_name VARCHAR(150) NOT NULL,
+  rule_id VARCHAR(150) NOT NULL,
   enabled TINYINT,
   UNIQUE(user_name, rule_id)
 );
diff --git a/synapse/storage/schema/delta/15/appservice_txns.sql b/synapse/storage/schema/delta/15/appservice_txns.sql
index 2b27e2a429..1c3324f415 100644
--- a/synapse/storage/schema/delta/15/appservice_txns.sql
+++ b/synapse/storage/schema/delta/15/appservice_txns.sql
@@ -14,17 +14,18 @@
  */
 
 CREATE TABLE IF NOT EXISTS application_services_state(
-    as_id TEXT PRIMARY KEY,
-    state TEXT,
-    last_txn TEXT
+    as_id VARCHAR(150) PRIMARY KEY,
+    state VARCHAR(5),
+    last_txn INTEGER
 );
 
 CREATE TABLE IF NOT EXISTS application_services_txns(
-    as_id TEXT NOT NULL,
+    as_id VARCHAR(150) NOT NULL,
     txn_id INTEGER NOT NULL,
     event_ids TEXT NOT NULL,
-    UNIQUE(as_id, txn_id) ON CONFLICT ROLLBACK
+    UNIQUE(as_id, txn_id)
 );
 
-
-
+CREATE INDEX IF NOT EXISTS application_services_txns_id ON application_services_txns (
+    as_id
+);
diff --git a/synapse/storage/schema/delta/15/presence_indices.sql b/synapse/storage/schema/delta/15/presence_indices.sql
new file mode 100644
index 0000000000..6b8d0f1ca7
--- /dev/null
+++ b/synapse/storage/schema/delta/15/presence_indices.sql
@@ -0,0 +1,2 @@
+
+CREATE INDEX IF NOT EXISTS presence_list_user_id ON presence_list (user_id);
diff --git a/synapse/storage/schema/delta/16/events_order_index.sql b/synapse/storage/schema/delta/16/events_order_index.sql
new file mode 100644
index 0000000000..a48f215170
--- /dev/null
+++ b/synapse/storage/schema/delta/16/events_order_index.sql
@@ -0,0 +1,4 @@
+CREATE INDEX events_order ON events (topological_ordering, stream_ordering);
+CREATE INDEX events_order_room ON events (
+    room_id, topological_ordering, stream_ordering
+);
diff --git a/synapse/storage/schema/delta/16/remote_media_cache_index.sql b/synapse/storage/schema/delta/16/remote_media_cache_index.sql
new file mode 100644
index 0000000000..7a15265cb1
--- /dev/null
+++ b/synapse/storage/schema/delta/16/remote_media_cache_index.sql
@@ -0,0 +1,2 @@
+CREATE INDEX IF NOT EXISTS remote_media_cache_thumbnails_media_id
+    ON remote_media_cache_thumbnails (media_id);
\ No newline at end of file
diff --git a/synapse/storage/schema/delta/16/remove_duplicates.sql b/synapse/storage/schema/delta/16/remove_duplicates.sql
new file mode 100644
index 0000000000..65c97b5e2f
--- /dev/null
+++ b/synapse/storage/schema/delta/16/remove_duplicates.sql
@@ -0,0 +1,9 @@
+
+
+DELETE FROM event_to_state_groups WHERE state_group not in (
+    SELECT MAX(state_group) FROM event_to_state_groups GROUP BY event_id
+);
+
+DELETE FROM event_to_state_groups WHERE rowid not in (
+    SELECT MIN(rowid) FROM event_to_state_groups GROUP BY event_id
+);
diff --git a/synapse/storage/schema/delta/16/room_alias_index.sql b/synapse/storage/schema/delta/16/room_alias_index.sql
new file mode 100644
index 0000000000..f82486132b
--- /dev/null
+++ b/synapse/storage/schema/delta/16/room_alias_index.sql
@@ -0,0 +1,3 @@
+
+CREATE INDEX IF NOT EXISTS room_aliases_id ON room_aliases(room_id);
+CREATE INDEX IF NOT EXISTS room_alias_servers_alias ON room_alias_servers(room_alias);
diff --git a/synapse/storage/schema/delta/16/unique_constraints.sql b/synapse/storage/schema/delta/16/unique_constraints.sql
new file mode 100644
index 0000000000..fecf11118c
--- /dev/null
+++ b/synapse/storage/schema/delta/16/unique_constraints.sql
@@ -0,0 +1,80 @@
+
+-- We can use SQLite features here, since other db support was only added in v16
+
+--
+DELETE FROM current_state_events WHERE rowid not in (
+    SELECT MIN(rowid) FROM current_state_events GROUP BY event_id
+);
+
+DROP INDEX IF EXISTS current_state_events_event_id;
+CREATE UNIQUE INDEX current_state_events_event_id ON current_state_events(event_id);
+
+--
+DELETE FROM room_memberships WHERE rowid not in (
+    SELECT MIN(rowid) FROM room_memberships GROUP BY event_id
+);
+
+DROP INDEX IF EXISTS room_memberships_event_id;
+CREATE UNIQUE INDEX room_memberships_event_id ON room_memberships(event_id);
+
+--
+DELETE FROM feedback WHERE rowid not in (
+    SELECT MIN(rowid) FROM feedback GROUP BY event_id
+);
+
+DROP INDEX IF EXISTS feedback_event_id;
+CREATE UNIQUE INDEX feedback_event_id ON feedback(event_id);
+
+--
+DELETE FROM topics WHERE rowid not in (
+    SELECT MIN(rowid) FROM topics GROUP BY event_id
+);
+
+DROP INDEX IF EXISTS topics_event_id;
+CREATE UNIQUE INDEX topics_event_id ON topics(event_id);
+
+--
+DELETE FROM room_names WHERE rowid not in (
+    SELECT MIN(rowid) FROM room_names GROUP BY event_id
+);
+
+DROP INDEX IF EXISTS room_names_id;
+CREATE UNIQUE INDEX room_names_id ON room_names(event_id);
+
+--
+DELETE FROM presence WHERE rowid not in (
+    SELECT MIN(rowid) FROM presence GROUP BY user_id
+);
+
+DROP INDEX IF EXISTS presence_id;
+CREATE UNIQUE INDEX presence_id ON presence(user_id);
+
+--
+DELETE FROM presence_allow_inbound WHERE rowid not in (
+    SELECT MIN(rowid) FROM presence_allow_inbound
+    GROUP BY observed_user_id, observer_user_id
+);
+
+DROP INDEX IF EXISTS presence_allow_inbound_observers;
+CREATE UNIQUE INDEX presence_allow_inbound_observers ON presence_allow_inbound(
+    observed_user_id, observer_user_id
+);
+
+--
+DELETE FROM presence_list WHERE rowid not in (
+    SELECT MIN(rowid) FROM presence_list
+    GROUP BY user_id, observed_user_id
+);
+
+DROP INDEX IF EXISTS presence_list_observers;
+CREATE UNIQUE INDEX presence_list_observers ON presence_list(
+    user_id, observed_user_id
+);
+
+--
+DELETE FROM room_aliases WHERE rowid not in (
+    SELECT MIN(rowid) FROM room_aliases GROUP BY room_alias
+);
+
+DROP INDEX IF EXISTS room_aliases_id;
+CREATE INDEX room_aliases_id ON room_aliases(room_id);
diff --git a/synapse/storage/schema/delta/16/users.sql b/synapse/storage/schema/delta/16/users.sql
new file mode 100644
index 0000000000..db27bdca02
--- /dev/null
+++ b/synapse/storage/schema/delta/16/users.sql
@@ -0,0 +1,56 @@
+-- Convert `access_tokens`.user from rowids to user strings.
+-- MUST BE DONE BEFORE REMOVING ID COLUMN FROM USERS TABLE BELOW
+CREATE TABLE IF NOT EXISTS new_access_tokens(
+    id BIGINT UNSIGNED PRIMARY KEY,
+    user_id VARCHAR(150) NOT NULL,
+    device_id VARCHAR(150),
+    token VARCHAR(150) NOT NULL,
+    last_used BIGINT UNSIGNED,
+    UNIQUE(token)
+);
+
+INSERT INTO new_access_tokens
+    SELECT a.id, u.name, a.device_id, a.token, a.last_used
+    FROM access_tokens as a
+    INNER JOIN users as u ON u.id = a.user_id;
+
+DROP TABLE access_tokens;
+
+ALTER TABLE new_access_tokens RENAME TO access_tokens;
+
+-- Remove ID column from `users` table
+CREATE TABLE IF NOT EXISTS new_users(
+    name VARCHAR(150),
+    password_hash VARCHAR(150),
+    creation_ts BIGINT UNSIGNED,
+    admin BOOL DEFAULT 0 NOT NULL,
+    UNIQUE(name)
+);
+
+INSERT INTO new_users SELECT name, password_hash, creation_ts, admin FROM users;
+
+DROP TABLE users;
+
+ALTER TABLE new_users RENAME TO users;
+
+
+-- Remove UNIQUE constraint from `user_ips` table
+CREATE TABLE IF NOT EXISTS new_user_ips (
+    user_id VARCHAR(150) NOT NULL,
+    access_token VARCHAR(150) NOT NULL,
+    device_id VARCHAR(150),
+    ip VARCHAR(150) NOT NULL,
+    user_agent VARCHAR(150) NOT NULL,
+    last_seen BIGINT UNSIGNED NOT NULL
+);
+
+INSERT INTO new_user_ips
+    SELECT user, access_token, device_id, ip, user_agent, last_seen FROM user_ips;
+
+DROP TABLE user_ips;
+
+ALTER TABLE new_user_ips RENAME TO user_ips;
+
+CREATE INDEX IF NOT EXISTS user_ips_user ON user_ips(user_id);
+CREATE INDEX IF NOT EXISTS user_ips_user_ip ON user_ips(user_id, access_token, ip);
+
diff --git a/synapse/storage/schema/full_schemas/11/event_edges.sql b/synapse/storage/schema/full_schemas/11/event_edges.sql
index 1e766d6db2..05d0874f0d 100644
--- a/synapse/storage/schema/full_schemas/11/event_edges.sql
+++ b/synapse/storage/schema/full_schemas/11/event_edges.sql
@@ -14,76 +14,76 @@
  */
 
 CREATE TABLE IF NOT EXISTS event_forward_extremities(
-    event_id TEXT NOT NULL,
-    room_id TEXT NOT NULL,
-    CONSTRAINT uniqueness UNIQUE (event_id, room_id) ON CONFLICT REPLACE
+    event_id VARCHAR(150) NOT NULL,
+    room_id VARCHAR(150) NOT NULL,
+    UNIQUE (event_id, room_id)
 );
 
-CREATE INDEX IF NOT EXISTS ev_extrem_room ON event_forward_extremities(room_id);
-CREATE INDEX IF NOT EXISTS ev_extrem_id ON event_forward_extremities(event_id);
+CREATE INDEX ev_extrem_room ON event_forward_extremities(room_id);
+CREATE INDEX ev_extrem_id ON event_forward_extremities(event_id);
 
 
 CREATE TABLE IF NOT EXISTS event_backward_extremities(
-    event_id TEXT NOT NULL,
-    room_id TEXT NOT NULL,
-    CONSTRAINT uniqueness UNIQUE (event_id, room_id) ON CONFLICT REPLACE
+    event_id VARCHAR(150) NOT NULL,
+    room_id VARCHAR(150) NOT NULL,
+    UNIQUE (event_id, room_id)
 );
 
-CREATE INDEX IF NOT EXISTS ev_b_extrem_room ON event_backward_extremities(room_id);
-CREATE INDEX IF NOT EXISTS ev_b_extrem_id ON event_backward_extremities(event_id);
+CREATE INDEX ev_b_extrem_room ON event_backward_extremities(room_id);
+CREATE INDEX ev_b_extrem_id ON event_backward_extremities(event_id);
 
 
 CREATE TABLE IF NOT EXISTS event_edges(
-    event_id TEXT NOT NULL,
-    prev_event_id TEXT NOT NULL,
-    room_id TEXT NOT NULL,
-    is_state INTEGER NOT NULL,
-    CONSTRAINT uniqueness UNIQUE (event_id, prev_event_id, room_id, is_state)
+    event_id VARCHAR(150) NOT NULL,
+    prev_event_id VARCHAR(150) NOT NULL,
+    room_id VARCHAR(150) NOT NULL,
+    is_state BOOL NOT NULL,
+    UNIQUE (event_id, prev_event_id, room_id, is_state)
 );
 
-CREATE INDEX IF NOT EXISTS ev_edges_id ON event_edges(event_id);
-CREATE INDEX IF NOT EXISTS ev_edges_prev_id ON event_edges(prev_event_id);
+CREATE INDEX ev_edges_id ON event_edges(event_id);
+CREATE INDEX ev_edges_prev_id ON event_edges(prev_event_id);
 
 
 CREATE TABLE IF NOT EXISTS room_depth(
-    room_id TEXT NOT NULL,
+    room_id VARCHAR(150) NOT NULL,
     min_depth INTEGER NOT NULL,
-    CONSTRAINT uniqueness UNIQUE (room_id)
+    UNIQUE (room_id)
 );
 
-CREATE INDEX IF NOT EXISTS room_depth_room ON room_depth(room_id);
+CREATE INDEX room_depth_room ON room_depth(room_id);
 
 
 create TABLE IF NOT EXISTS event_destinations(
-    event_id TEXT NOT NULL,
-    destination TEXT NOT NULL,
-    delivered_ts INTEGER DEFAULT 0, -- or 0 if not delivered
-    CONSTRAINT uniqueness UNIQUE (event_id, destination) ON CONFLICT REPLACE
+    event_id VARCHAR(150) NOT NULL,
+    destination VARCHAR(150) NOT NULL,
+    delivered_ts BIGINT DEFAULT 0, -- or 0 if not delivered
+    UNIQUE (event_id, destination)
 );
 
-CREATE INDEX IF NOT EXISTS event_destinations_id ON event_destinations(event_id);
+CREATE INDEX event_destinations_id ON event_destinations(event_id);
 
 
 CREATE TABLE IF NOT EXISTS state_forward_extremities(
-    event_id TEXT NOT NULL,
-    room_id TEXT NOT NULL,
-    type TEXT NOT NULL,
-    state_key TEXT NOT NULL,
-    CONSTRAINT uniqueness UNIQUE (event_id, room_id) ON CONFLICT REPLACE
+    event_id VARCHAR(150) NOT NULL,
+    room_id VARCHAR(150) NOT NULL,
+    type VARCHAR(150) NOT NULL,
+    state_key VARCHAR(150) NOT NULL,
+    UNIQUE (event_id, room_id)
 );
 
-CREATE INDEX IF NOT EXISTS st_extrem_keys ON state_forward_extremities(
+CREATE INDEX st_extrem_keys ON state_forward_extremities(
     room_id, type, state_key
 );
-CREATE INDEX IF NOT EXISTS st_extrem_id ON state_forward_extremities(event_id);
+CREATE INDEX st_extrem_id ON state_forward_extremities(event_id);
 
 
 CREATE TABLE IF NOT EXISTS event_auth(
-    event_id TEXT NOT NULL,
-    auth_id TEXT NOT NULL,
-    room_id TEXT NOT NULL,
-    CONSTRAINT uniqueness UNIQUE (event_id, auth_id, room_id)
+    event_id VARCHAR(150) NOT NULL,
+    auth_id VARCHAR(150) NOT NULL,
+    room_id VARCHAR(150) NOT NULL,
+    UNIQUE (event_id, auth_id, room_id)
 );
 
-CREATE INDEX IF NOT EXISTS evauth_edges_id ON event_auth(event_id);
-CREATE INDEX IF NOT EXISTS evauth_edges_auth_id ON event_auth(auth_id);
\ No newline at end of file
+CREATE INDEX evauth_edges_id ON event_auth(event_id);
+CREATE INDEX evauth_edges_auth_id ON event_auth(auth_id);
diff --git a/synapse/storage/schema/full_schemas/11/event_signatures.sql b/synapse/storage/schema/full_schemas/11/event_signatures.sql
index c28c39c48a..4291827368 100644
--- a/synapse/storage/schema/full_schemas/11/event_signatures.sql
+++ b/synapse/storage/schema/full_schemas/11/event_signatures.sql
@@ -14,52 +14,42 @@
  */
 
 CREATE TABLE IF NOT EXISTS event_content_hashes (
-    event_id TEXT,
-    algorithm TEXT,
-    hash BLOB,
-    CONSTRAINT uniqueness UNIQUE (event_id, algorithm)
+    event_id VARCHAR(150),
+    algorithm VARCHAR(150),
+    hash bytea,
+    UNIQUE (event_id, algorithm)
 );
 
-CREATE INDEX IF NOT EXISTS event_content_hashes_id ON event_content_hashes(
-    event_id
-);
+CREATE INDEX event_content_hashes_id ON event_content_hashes(event_id);
 
 
 CREATE TABLE IF NOT EXISTS event_reference_hashes (
-    event_id TEXT,
-    algorithm TEXT,
-    hash BLOB,
-    CONSTRAINT uniqueness UNIQUE (event_id, algorithm)
+    event_id VARCHAR(150),
+    algorithm VARCHAR(150),
+    hash bytea,
+    UNIQUE (event_id, algorithm)
 );
 
-CREATE INDEX IF NOT EXISTS event_reference_hashes_id ON event_reference_hashes (
-    event_id
-);
+CREATE INDEX event_reference_hashes_id ON event_reference_hashes(event_id);
 
 
 CREATE TABLE IF NOT EXISTS event_signatures (
-    event_id TEXT,
-    signature_name TEXT,
-    key_id TEXT,
-    signature BLOB,
-    CONSTRAINT uniqueness UNIQUE (event_id, signature_name, key_id)
+    event_id VARCHAR(150),
+    signature_name VARCHAR(150),
+    key_id VARCHAR(150),
+    signature bytea,
+    UNIQUE (event_id, signature_name, key_id)
 );
 
-CREATE INDEX IF NOT EXISTS event_signatures_id ON event_signatures (
-    event_id
-);
+CREATE INDEX event_signatures_id ON event_signatures(event_id);
 
 
 CREATE TABLE IF NOT EXISTS event_edge_hashes(
-    event_id TEXT,
-    prev_event_id TEXT,
-    algorithm TEXT,
-    hash BLOB,
-    CONSTRAINT uniqueness UNIQUE (
-        event_id, prev_event_id, algorithm
-    )
+    event_id VARCHAR(150),
+    prev_event_id VARCHAR(150),
+    algorithm VARCHAR(150),
+    hash bytea,
+    UNIQUE (event_id, prev_event_id, algorithm)
 );
 
-CREATE INDEX IF NOT EXISTS event_edge_hashes_id ON event_edge_hashes(
-    event_id
-);
+CREATE INDEX event_edge_hashes_id ON event_edge_hashes(event_id);
diff --git a/synapse/storage/schema/full_schemas/11/im.sql b/synapse/storage/schema/full_schemas/11/im.sql
index dd00c1cd2f..addbec5885 100644
--- a/synapse/storage/schema/full_schemas/11/im.sql
+++ b/synapse/storage/schema/full_schemas/11/im.sql
@@ -15,111 +15,109 @@
 
 CREATE TABLE IF NOT EXISTS events(
     stream_ordering INTEGER PRIMARY KEY AUTOINCREMENT,
-    topological_ordering INTEGER NOT NULL,
-    event_id TEXT NOT NULL,
-    type TEXT NOT NULL,
-    room_id TEXT NOT NULL,
+    topological_ordering BIGINT NOT NULL,
+    event_id VARCHAR(150) NOT NULL,
+    type VARCHAR(150) NOT NULL,
+    room_id VARCHAR(150) NOT NULL,
     content TEXT NOT NULL,
     unrecognized_keys TEXT,
     processed BOOL NOT NULL,
     outlier BOOL NOT NULL,
-    depth INTEGER DEFAULT 0 NOT NULL,
-    CONSTRAINT ev_uniq UNIQUE (event_id)
+    depth BIGINT DEFAULT 0 NOT NULL,
+    UNIQUE (event_id)
 );
 
-CREATE INDEX IF NOT EXISTS events_event_id ON events (event_id);
-CREATE INDEX IF NOT EXISTS events_stream_ordering ON events (stream_ordering);
-CREATE INDEX IF NOT EXISTS events_topological_ordering ON events (topological_ordering);
-CREATE INDEX IF NOT EXISTS events_room_id ON events (room_id);
+CREATE INDEX events_stream_ordering ON events (stream_ordering);
+CREATE INDEX events_topological_ordering ON events (topological_ordering);
+CREATE INDEX events_room_id ON events (room_id);
 
 
 CREATE TABLE IF NOT EXISTS event_json(
-    event_id TEXT NOT NULL,
-    room_id TEXT NOT NULL,
-    internal_metadata NOT NULL,
-    json BLOB NOT NULL,
-    CONSTRAINT ev_j_uniq UNIQUE (event_id)
+    event_id VARCHAR(150) NOT NULL,
+    room_id VARCHAR(150) NOT NULL,
+    internal_metadata TEXT NOT NULL,
+    json TEXT NOT NULL,
+    UNIQUE (event_id)
 );
 
-CREATE INDEX IF NOT EXISTS event_json_id ON event_json(event_id);
-CREATE INDEX IF NOT EXISTS event_json_room_id ON event_json(room_id);
+CREATE INDEX event_json_room_id ON event_json(room_id);
 
 
 CREATE TABLE IF NOT EXISTS state_events(
-    event_id TEXT NOT NULL,
-    room_id TEXT NOT NULL,
-    type TEXT NOT NULL,
-    state_key TEXT NOT NULL,
-    prev_state TEXT
+    event_id VARCHAR(150) NOT NULL,
+    room_id VARCHAR(150) NOT NULL,
+    type VARCHAR(150) NOT NULL,
+    state_key VARCHAR(150) NOT NULL,
+    prev_state VARCHAR(150),
+    UNIQUE (event_id)
 );
 
-CREATE UNIQUE INDEX IF NOT EXISTS state_events_event_id ON state_events (event_id);
-CREATE INDEX IF NOT EXISTS state_events_room_id ON state_events (room_id);
-CREATE INDEX IF NOT EXISTS state_events_type ON state_events (type);
-CREATE INDEX IF NOT EXISTS state_events_state_key ON state_events (state_key);
+CREATE INDEX state_events_room_id ON state_events (room_id);
+CREATE INDEX state_events_type ON state_events (type);
+CREATE INDEX state_events_state_key ON state_events (state_key);
 
 
 CREATE TABLE IF NOT EXISTS current_state_events(
-    event_id TEXT NOT NULL,
-    room_id TEXT NOT NULL,
-    type TEXT NOT NULL,
-    state_key TEXT NOT NULL,
-    CONSTRAINT curr_uniq UNIQUE (room_id, type, state_key) ON CONFLICT REPLACE
+    event_id VARCHAR(150) NOT NULL,
+    room_id VARCHAR(150) NOT NULL,
+    type VARCHAR(150) NOT NULL,
+    state_key VARCHAR(150) NOT NULL,
+    UNIQUE (room_id, type, state_key)
 );
 
-CREATE INDEX IF NOT EXISTS curr_events_event_id ON current_state_events (event_id);
-CREATE INDEX IF NOT EXISTS current_state_events_room_id ON current_state_events (room_id);
-CREATE INDEX IF NOT EXISTS current_state_events_type ON current_state_events (type);
-CREATE INDEX IF NOT EXISTS current_state_events_state_key ON current_state_events (state_key);
+CREATE INDEX curr_events_event_id ON current_state_events (event_id);
+CREATE INDEX current_state_events_room_id ON current_state_events (room_id);
+CREATE INDEX current_state_events_type ON current_state_events (type);
+CREATE INDEX current_state_events_state_key ON current_state_events (state_key);
 
 CREATE TABLE IF NOT EXISTS room_memberships(
-    event_id TEXT NOT NULL,
-    user_id TEXT NOT NULL,
-    sender TEXT NOT NULL,
-    room_id TEXT NOT NULL,
-    membership TEXT NOT NULL
+    event_id VARCHAR(150) NOT NULL,
+    user_id VARCHAR(150) NOT NULL,
+    sender VARCHAR(150) NOT NULL,
+    room_id VARCHAR(150) NOT NULL,
+    membership VARCHAR(150) NOT NULL
 );
 
-CREATE INDEX IF NOT EXISTS room_memberships_event_id ON room_memberships (event_id);
-CREATE INDEX IF NOT EXISTS room_memberships_room_id ON room_memberships (room_id);
-CREATE INDEX IF NOT EXISTS room_memberships_user_id ON room_memberships (user_id);
+CREATE INDEX room_memberships_event_id ON room_memberships (event_id);
+CREATE INDEX room_memberships_room_id ON room_memberships (room_id);
+CREATE INDEX room_memberships_user_id ON room_memberships (user_id);
 
 CREATE TABLE IF NOT EXISTS feedback(
-    event_id TEXT NOT NULL,
-    feedback_type TEXT,
-    target_event_id TEXT,
-    sender TEXT,
-    room_id TEXT
+    event_id VARCHAR(150) NOT NULL,
+    feedback_type VARCHAR(150),
+    target_event_id VARCHAR(150),
+    sender VARCHAR(150),
+    room_id VARCHAR(150)
 );
 
 CREATE TABLE IF NOT EXISTS topics(
-    event_id TEXT NOT NULL,
-    room_id TEXT NOT NULL,
+    event_id VARCHAR(150) NOT NULL,
+    room_id VARCHAR(150) NOT NULL,
     topic TEXT NOT NULL
 );
 
-CREATE INDEX IF NOT EXISTS topics_event_id ON topics(event_id);
-CREATE INDEX IF NOT EXISTS topics_room_id ON topics(room_id);
+CREATE INDEX topics_event_id ON topics(event_id);
+CREATE INDEX topics_room_id ON topics(room_id);
 
 CREATE TABLE IF NOT EXISTS room_names(
-    event_id TEXT NOT NULL,
-    room_id TEXT NOT NULL,
+    event_id VARCHAR(150) NOT NULL,
+    room_id VARCHAR(150) NOT NULL,
     name TEXT NOT NULL
 );
 
-CREATE INDEX IF NOT EXISTS room_names_event_id ON room_names(event_id);
-CREATE INDEX IF NOT EXISTS room_names_room_id ON room_names(room_id);
+CREATE INDEX room_names_event_id ON room_names(event_id);
+CREATE INDEX room_names_room_id ON room_names(room_id);
 
 CREATE TABLE IF NOT EXISTS rooms(
-    room_id TEXT PRIMARY KEY NOT NULL,
-    is_public INTEGER,
-    creator TEXT
+    room_id VARCHAR(150) PRIMARY KEY NOT NULL,
+    is_public BOOL,
+    creator VARCHAR(150)
 );
 
 CREATE TABLE IF NOT EXISTS room_hosts(
-    room_id TEXT NOT NULL,
-    host TEXT NOT NULL,
-    CONSTRAINT room_hosts_uniq UNIQUE (room_id, host) ON CONFLICT IGNORE
+    room_id VARCHAR(150) NOT NULL,
+    host VARCHAR(150) NOT NULL,
+    UNIQUE (room_id, host)
 );
 
-CREATE INDEX IF NOT EXISTS room_hosts_room_id ON room_hosts (room_id);
+CREATE INDEX room_hosts_room_id ON room_hosts (room_id);
diff --git a/synapse/storage/schema/full_schemas/11/keys.sql b/synapse/storage/schema/full_schemas/11/keys.sql
index a9e0a4fe0d..459b510427 100644
--- a/synapse/storage/schema/full_schemas/11/keys.sql
+++ b/synapse/storage/schema/full_schemas/11/keys.sql
@@ -13,19 +13,19 @@
  * limitations under the License.
  */
 CREATE TABLE IF NOT EXISTS server_tls_certificates(
-  server_name TEXT, -- Server name.
-  fingerprint TEXT, -- Certificate fingerprint.
-  from_server TEXT, -- Which key server the certificate was fetched from.
-  ts_added_ms INTEGER, -- When the certifcate was added.
-  tls_certificate BLOB, -- DER encoded x509 certificate.
-  CONSTRAINT uniqueness UNIQUE (server_name, fingerprint)
+  server_name VARCHAR(150), -- Server name.
+  fingerprint VARCHAR(150), -- Certificate fingerprint.
+  from_server VARCHAR(150), -- Which key server the certificate was fetched from.
+  ts_added_ms BIGINT, -- When the certifcate was added.
+  tls_certificate bytea, -- DER encoded x509 certificate.
+  UNIQUE (server_name, fingerprint)
 );
 
 CREATE TABLE IF NOT EXISTS server_signature_keys(
-  server_name TEXT, -- Server name.
-  key_id TEXT, -- Key version.
-  from_server TEXT, -- Which key server the key was fetched form.
-  ts_added_ms INTEGER, -- When the key was added.
-  verify_key BLOB, -- NACL verification key.
-  CONSTRAINT uniqueness UNIQUE (server_name, key_id)
+  server_name VARCHAR(150), -- Server name.
+  key_id VARCHAR(150), -- Key version.
+  from_server VARCHAR(150), -- Which key server the key was fetched form.
+  ts_added_ms BIGINT, -- When the key was added.
+  verify_key bytea, -- NACL verification key.
+  UNIQUE (server_name, key_id)
 );
diff --git a/synapse/storage/schema/full_schemas/11/media_repository.sql b/synapse/storage/schema/full_schemas/11/media_repository.sql
index afdf48cbfb..6e0ee0db41 100644
--- a/synapse/storage/schema/full_schemas/11/media_repository.sql
+++ b/synapse/storage/schema/full_schemas/11/media_repository.sql
@@ -14,55 +14,52 @@
  */
 
 CREATE TABLE IF NOT EXISTS local_media_repository (
-    media_id TEXT, -- The id used to refer to the media.
-    media_type TEXT, -- The MIME-type of the media.
+    media_id VARCHAR(150), -- The id used to refer to the media.
+    media_type VARCHAR(150), -- The MIME-type of the media.
     media_length INTEGER, -- Length of the media in bytes.
-    created_ts INTEGER, -- When the content was uploaded in ms.
-    upload_name TEXT, -- The name the media was uploaded with.
-    user_id TEXT, -- The user who uploaded the file.
-    CONSTRAINT uniqueness UNIQUE (media_id)
+    created_ts BIGINT, -- When the content was uploaded in ms.
+    upload_name VARCHAR(150), -- The name the media was uploaded with.
+    user_id VARCHAR(150), -- The user who uploaded the file.
+    UNIQUE (media_id)
 );
 
 CREATE TABLE IF NOT EXISTS local_media_repository_thumbnails (
-    media_id TEXT, -- The id used to refer to the media.
+    media_id VARCHAR(150), -- The id used to refer to the media.
     thumbnail_width INTEGER, -- The width of the thumbnail in pixels.
     thumbnail_height INTEGER, -- The height of the thumbnail in pixels.
-    thumbnail_type TEXT, -- The MIME-type of the thumbnail.
-    thumbnail_method TEXT, -- The method used to make the thumbnail.
+    thumbnail_type VARCHAR(150), -- The MIME-type of the thumbnail.
+    thumbnail_method VARCHAR(150), -- The method used to make the thumbnail.
     thumbnail_length INTEGER, -- The length of the thumbnail in bytes.
-    CONSTRAINT uniqueness UNIQUE (
+    UNIQUE (
         media_id, thumbnail_width, thumbnail_height, thumbnail_type
     )
 );
 
-CREATE INDEX IF NOT EXISTS local_media_repository_thumbnails_media_id
+CREATE INDEX local_media_repository_thumbnails_media_id
     ON local_media_repository_thumbnails (media_id);
 
 CREATE TABLE IF NOT EXISTS remote_media_cache (
-    media_origin TEXT, -- The remote HS the media came from.
-    media_id TEXT, -- The id used to refer to the media on that server.
-    media_type TEXT, -- The MIME-type of the media.
-    created_ts INTEGER, -- When the content was uploaded in ms.
-    upload_name TEXT, -- The name the media was uploaded with.
+    media_origin VARCHAR(150), -- The remote HS the media came from.
+    media_id VARCHAR(150), -- The id used to refer to the media on that server.
+    media_type VARCHAR(150), -- The MIME-type of the media.
+    created_ts BIGINT, -- When the content was uploaded in ms.
+    upload_name VARCHAR(150), -- The name the media was uploaded with.
     media_length INTEGER, -- Length of the media in bytes.
-    filesystem_id TEXT, -- The name used to store the media on disk.
-    CONSTRAINT uniqueness UNIQUE (media_origin, media_id)
+    filesystem_id VARCHAR(150), -- The name used to store the media on disk.
+    UNIQUE (media_origin, media_id)
 );
 
 CREATE TABLE IF NOT EXISTS remote_media_cache_thumbnails (
-    media_origin TEXT, -- The remote HS the media came from.
-    media_id TEXT, -- The id used to refer to the media.
+    media_origin VARCHAR(150), -- The remote HS the media came from.
+    media_id VARCHAR(150), -- The id used to refer to the media.
     thumbnail_width INTEGER, -- The width of the thumbnail in pixels.
     thumbnail_height INTEGER, -- The height of the thumbnail in pixels.
-    thumbnail_method TEXT, -- The method used to make the thumbnail
-    thumbnail_type TEXT, -- The MIME-type of the thumbnail.
+    thumbnail_method VARCHAR(150), -- The method used to make the thumbnail
+    thumbnail_type VARCHAR(150), -- The MIME-type of the thumbnail.
     thumbnail_length INTEGER, -- The length of the thumbnail in bytes.
-    filesystem_id TEXT, -- The name used to store the media on disk.
-    CONSTRAINT uniqueness UNIQUE (
+    filesystem_id VARCHAR(150), -- The name used to store the media on disk.
+    UNIQUE (
         media_origin, media_id, thumbnail_width, thumbnail_height,
-        thumbnail_type, thumbnail_type
-    )
+        thumbnail_type
+     )
 );
-
-CREATE INDEX IF NOT EXISTS remote_media_cache_thumbnails_media_id
-    ON local_media_repository_thumbnails (media_id);
diff --git a/synapse/storage/schema/full_schemas/11/presence.sql b/synapse/storage/schema/full_schemas/11/presence.sql
index f9f8db9697..fce324b890 100644
--- a/synapse/storage/schema/full_schemas/11/presence.sql
+++ b/synapse/storage/schema/full_schemas/11/presence.sql
@@ -13,26 +13,23 @@
  * limitations under the License.
  */
 CREATE TABLE IF NOT EXISTS presence(
-  user_id INTEGER NOT NULL,
-  state INTEGER,
-  status_msg TEXT,
-  mtime INTEGER, -- miliseconds since last state change
-  FOREIGN KEY(user_id) REFERENCES users(id)
+  user_id VARCHAR(150) NOT NULL,
+  state VARCHAR(20),
+  status_msg VARCHAR(150),
+  mtime BIGINT -- miliseconds since last state change
 );
 
 -- For each of /my/ users which possibly-remote users are allowed to see their
 -- presence state
 CREATE TABLE IF NOT EXISTS presence_allow_inbound(
-  observed_user_id INTEGER NOT NULL,
-  observer_user_id TEXT, -- a UserID,
-  FOREIGN KEY(observed_user_id) REFERENCES users(id)
+  observed_user_id VARCHAR(150) NOT NULL,
+  observer_user_id VARCHAR(150) NOT NULL -- a UserID,
 );
 
 -- For each of /my/ users (watcher), which possibly-remote users are they
 -- watching?
 CREATE TABLE IF NOT EXISTS presence_list(
-  user_id INTEGER NOT NULL,
-  observed_user_id TEXT, -- a UserID,
-  accepted BOOLEAN,
-  FOREIGN KEY(user_id) REFERENCES users(id)
+  user_id VARCHAR(150) NOT NULL,
+  observed_user_id VARCHAR(150) NOT NULL, -- a UserID,
+  accepted BOOLEAN NOT NULL
 );
diff --git a/synapse/storage/schema/full_schemas/11/profiles.sql b/synapse/storage/schema/full_schemas/11/profiles.sql
index f06a528b4d..ffe75edf9f 100644
--- a/synapse/storage/schema/full_schemas/11/profiles.sql
+++ b/synapse/storage/schema/full_schemas/11/profiles.sql
@@ -13,8 +13,7 @@
  * limitations under the License.
  */
 CREATE TABLE IF NOT EXISTS profiles(
-    user_id INTEGER NOT NULL,
-    displayname TEXT,
-    avatar_url TEXT,
-    FOREIGN KEY(user_id) REFERENCES users(id)
+    user_id VARCHAR(150) NOT NULL,
+    displayname VARCHAR(150),
+    avatar_url VARCHAR(150)
 );
diff --git a/synapse/storage/schema/full_schemas/11/redactions.sql b/synapse/storage/schema/full_schemas/11/redactions.sql
index 5011d95db8..492fd22033 100644
--- a/synapse/storage/schema/full_schemas/11/redactions.sql
+++ b/synapse/storage/schema/full_schemas/11/redactions.sql
@@ -13,10 +13,10 @@
  * limitations under the License.
  */
 CREATE TABLE IF NOT EXISTS redactions (
-    event_id TEXT NOT NULL,
-    redacts TEXT NOT NULL,
-    CONSTRAINT ev_uniq UNIQUE (event_id)
+    event_id VARCHAR(150) NOT NULL,
+    redacts VARCHAR(150) NOT NULL,
+    UNIQUE (event_id)
 );
 
-CREATE INDEX IF NOT EXISTS redactions_event_id ON redactions (event_id);
-CREATE INDEX IF NOT EXISTS redactions_redacts ON redactions (redacts);
+CREATE INDEX redactions_event_id ON redactions (event_id);
+CREATE INDEX redactions_redacts ON redactions (redacts);
diff --git a/synapse/storage/schema/full_schemas/11/room_aliases.sql b/synapse/storage/schema/full_schemas/11/room_aliases.sql
index 0d2df01603..6226913227 100644
--- a/synapse/storage/schema/full_schemas/11/room_aliases.sql
+++ b/synapse/storage/schema/full_schemas/11/room_aliases.sql
@@ -14,14 +14,11 @@
  */
 
 CREATE TABLE IF NOT EXISTS room_aliases(
-    room_alias TEXT NOT NULL,
-    room_id TEXT NOT NULL
+    room_alias VARCHAR(150) NOT NULL,
+    room_id VARCHAR(150) NOT NULL
 );
 
 CREATE TABLE IF NOT EXISTS room_alias_servers(
-    room_alias TEXT NOT NULL,
-    server TEXT NOT NULL
+    room_alias VARCHAR(150) NOT NULL,
+    server VARCHAR(150) NOT NULL
 );
-
-
-
diff --git a/synapse/storage/schema/full_schemas/11/state.sql b/synapse/storage/schema/full_schemas/11/state.sql
index 1fe8f1e430..62c20819fd 100644
--- a/synapse/storage/schema/full_schemas/11/state.sql
+++ b/synapse/storage/schema/full_schemas/11/state.sql
@@ -15,33 +15,26 @@
 
 CREATE TABLE IF NOT EXISTS state_groups(
     id INTEGER PRIMARY KEY,
-    room_id TEXT NOT NULL,
-    event_id TEXT NOT NULL
+    room_id VARCHAR(150) NOT NULL,
+    event_id VARCHAR(150) NOT NULL
 );
 
 CREATE TABLE IF NOT EXISTS state_groups_state(
     state_group INTEGER NOT NULL,
-    room_id TEXT NOT NULL,
-    type TEXT NOT NULL,
-    state_key TEXT NOT NULL,
-    event_id TEXT NOT NULL
+    room_id VARCHAR(150) NOT NULL,
+    type VARCHAR(150) NOT NULL,
+    state_key VARCHAR(150) NOT NULL,
+    event_id VARCHAR(150) NOT NULL
 );
 
 CREATE TABLE IF NOT EXISTS event_to_state_groups(
-    event_id TEXT NOT NULL,
+    event_id VARCHAR(150) NOT NULL,
     state_group INTEGER NOT NULL,
-    CONSTRAINT event_to_state_groups_uniq UNIQUE (event_id)
+    UNIQUE (event_id)
 );
 
-CREATE INDEX IF NOT EXISTS state_groups_id ON state_groups(id);
+CREATE INDEX state_groups_id ON state_groups(id);
 
-CREATE INDEX IF NOT EXISTS state_groups_state_id ON state_groups_state(
-    state_group
-);
-CREATE INDEX IF NOT EXISTS state_groups_state_tuple ON state_groups_state(
-    room_id, type, state_key
-);
-
-CREATE INDEX IF NOT EXISTS event_to_state_groups_id ON event_to_state_groups(
-    event_id
-);
\ No newline at end of file
+CREATE INDEX state_groups_state_id ON state_groups_state(state_group);
+CREATE INDEX state_groups_state_tuple ON state_groups_state(room_id, type, state_key);
+CREATE INDEX event_to_state_groups_id ON event_to_state_groups(event_id);
diff --git a/synapse/storage/schema/full_schemas/11/transactions.sql b/synapse/storage/schema/full_schemas/11/transactions.sql
index 2d30f99b06..524a696920 100644
--- a/synapse/storage/schema/full_schemas/11/transactions.sql
+++ b/synapse/storage/schema/full_schemas/11/transactions.sql
@@ -14,55 +14,50 @@
  */
 -- Stores what transaction ids we have received and what our response was
 CREATE TABLE IF NOT EXISTS received_transactions(
-    transaction_id TEXT, 
-    origin TEXT, 
-    ts INTEGER,
+    transaction_id VARCHAR(150),
+    origin VARCHAR(150),
+    ts BIGINT,
     response_code INTEGER,
-    response_json TEXT,
-    has_been_referenced BOOL default 0, -- Whether thishas been referenced by a prev_tx
-    CONSTRAINT uniquesss UNIQUE (transaction_id, origin) ON CONFLICT REPLACE
+    response_json bytea,
+    has_been_referenced SMALLINT DEFAULT 0, -- Whether thishas been referenced by a prev_tx
+    UNIQUE (transaction_id, origin)
 );
 
-CREATE UNIQUE INDEX IF NOT EXISTS transactions_txid ON received_transactions(transaction_id, origin);
-CREATE INDEX IF NOT EXISTS transactions_have_ref ON received_transactions(origin, has_been_referenced);-- WHERE has_been_referenced = 0;
+CREATE INDEX transactions_have_ref ON received_transactions(origin, has_been_referenced);-- WHERE has_been_referenced = 0;
 
 
 -- Stores what transactions we've sent, what their response was (if we got one) and whether we have
 -- since referenced the transaction in another outgoing transaction
 CREATE TABLE IF NOT EXISTS sent_transactions(
     id INTEGER PRIMARY KEY AUTOINCREMENT, -- This is used to apply insertion ordering
-    transaction_id TEXT,
-    destination TEXT,
+    transaction_id VARCHAR(150),
+    destination VARCHAR(150),
     response_code INTEGER DEFAULT 0,
     response_json TEXT,
-    ts INTEGER
+    ts BIGINT
 );
 
-CREATE INDEX IF NOT EXISTS sent_transaction_dest ON sent_transactions(destination);
-CREATE INDEX IF NOT EXISTS sent_transaction_dest_referenced ON sent_transactions(
-    destination
-);
-CREATE INDEX IF NOT EXISTS sent_transaction_txn_id ON sent_transactions(transaction_id);
+CREATE INDEX sent_transaction_dest ON sent_transactions(destination);
+CREATE INDEX sent_transaction_txn_id ON sent_transactions(transaction_id);
 -- So that we can do an efficient look up of all transactions that have yet to be successfully
 -- sent.
-CREATE INDEX IF NOT EXISTS sent_transaction_sent ON sent_transactions(response_code);
+CREATE INDEX sent_transaction_sent ON sent_transactions(response_code);
 
 
 -- For sent transactions only.
 CREATE TABLE IF NOT EXISTS transaction_id_to_pdu(
     transaction_id INTEGER,
-    destination TEXT,
-    pdu_id TEXT,
-    pdu_origin TEXT
+    destination VARCHAR(150),
+    pdu_id VARCHAR(150),
+    pdu_origin VARCHAR(150)
 );
 
-CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_tx ON transaction_id_to_pdu(transaction_id, destination);
-CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_dest ON transaction_id_to_pdu(destination);
-CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_index ON transaction_id_to_pdu(transaction_id, destination);
+CREATE INDEX transaction_id_to_pdu_tx ON transaction_id_to_pdu(transaction_id, destination);
+CREATE INDEX transaction_id_to_pdu_dest ON transaction_id_to_pdu(destination);
 
 -- To track destination health
 CREATE TABLE IF NOT EXISTS destinations(
-    destination TEXT PRIMARY KEY,
-    retry_last_ts INTEGER,
+    destination VARCHAR(150) PRIMARY KEY,
+    retry_last_ts BIGINT,
     retry_interval INTEGER
 );
diff --git a/synapse/storage/schema/full_schemas/11/users.sql b/synapse/storage/schema/full_schemas/11/users.sql
index 08ccfdac0a..48a6aecfe8 100644
--- a/synapse/storage/schema/full_schemas/11/users.sql
+++ b/synapse/storage/schema/full_schemas/11/users.sql
@@ -14,32 +14,30 @@
  */
 CREATE TABLE IF NOT EXISTS users(
     id INTEGER PRIMARY KEY AUTOINCREMENT,
-    name TEXT,
-    password_hash TEXT,
-    creation_ts INTEGER,
-    admin BOOL DEFAULT 0 NOT NULL,
-    UNIQUE(name) ON CONFLICT ROLLBACK
+    name VARCHAR(150),
+    password_hash VARCHAR(150),
+    creation_ts BIGINT,
+    admin SMALLINT DEFAULT 0 NOT NULL,
+    UNIQUE(name)
 );
 
 CREATE TABLE IF NOT EXISTS access_tokens(
     id INTEGER PRIMARY KEY AUTOINCREMENT,
-    user_id INTEGER NOT NULL,
-    device_id TEXT,
-    token TEXT NOT NULL,
-    last_used INTEGER,
-    FOREIGN KEY(user_id) REFERENCES users(id),
-    UNIQUE(token) ON CONFLICT ROLLBACK
+    user_id VARCHAR(150) NOT NULL,
+    device_id VARCHAR(150),
+    token VARCHAR(150) NOT NULL,
+    last_used BIGINT,
+    UNIQUE(token)
 );
 
 CREATE TABLE IF NOT EXISTS user_ips (
-    user TEXT NOT NULL,
-    access_token TEXT NOT NULL,
-    device_id TEXT,
-    ip TEXT NOT NULL,
+    user VARCHAR(150) NOT NULL,
+    access_token VARCHAR(150) NOT NULL,
+    device_id VARCHAR(150),
+    ip VARCHAR(150) NOT NULL,
     user_agent TEXT NOT NULL,
-    last_seen INTEGER NOT NULL,
-    CONSTRAINT user_ip UNIQUE (user, access_token, ip, user_agent) ON CONFLICT REPLACE
+    last_seen BIGINT NOT NULL,
+    UNIQUE (user, access_token, ip, user_agent)
 );
 
-CREATE INDEX IF NOT EXISTS user_ips_user ON user_ips(user);
-
+CREATE INDEX user_ips_user ON user_ips(user);
diff --git a/synapse/storage/schema/full_schemas/16/application_services.sql b/synapse/storage/schema/full_schemas/16/application_services.sql
new file mode 100644
index 0000000000..5d63d57d59
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/application_services.sql
@@ -0,0 +1,48 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS application_services(
+    id BIGINT PRIMARY KEY,
+    url VARCHAR(150),
+    token VARCHAR(150),
+    hs_token VARCHAR(150),
+    sender VARCHAR(150),
+    UNIQUE(token)
+);
+
+CREATE TABLE IF NOT EXISTS application_services_regex(
+    id BIGINT PRIMARY KEY,
+    as_id BIGINT NOT NULL,
+    namespace INTEGER,  /* enum[room_id|room_alias|user_id] */
+    regex VARCHAR(150),
+    FOREIGN KEY(as_id) REFERENCES application_services(id)
+);
+
+CREATE TABLE IF NOT EXISTS application_services_state(
+    as_id VARCHAR(150) PRIMARY KEY,
+    state VARCHAR(5),
+    last_txn INTEGER
+);
+
+CREATE TABLE IF NOT EXISTS application_services_txns(
+    as_id VARCHAR(150) NOT NULL,
+    txn_id INTEGER NOT NULL,
+    event_ids TEXT NOT NULL,
+    UNIQUE(as_id, txn_id)
+);
+
+CREATE INDEX application_services_txns_id ON application_services_txns (
+    as_id
+);
diff --git a/synapse/storage/schema/full_schemas/16/event_edges.sql b/synapse/storage/schema/full_schemas/16/event_edges.sql
new file mode 100644
index 0000000000..05d0874f0d
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/event_edges.sql
@@ -0,0 +1,89 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS event_forward_extremities(
+    event_id VARCHAR(150) NOT NULL,
+    room_id VARCHAR(150) NOT NULL,
+    UNIQUE (event_id, room_id)
+);
+
+CREATE INDEX ev_extrem_room ON event_forward_extremities(room_id);
+CREATE INDEX ev_extrem_id ON event_forward_extremities(event_id);
+
+
+CREATE TABLE IF NOT EXISTS event_backward_extremities(
+    event_id VARCHAR(150) NOT NULL,
+    room_id VARCHAR(150) NOT NULL,
+    UNIQUE (event_id, room_id)
+);
+
+CREATE INDEX ev_b_extrem_room ON event_backward_extremities(room_id);
+CREATE INDEX ev_b_extrem_id ON event_backward_extremities(event_id);
+
+
+CREATE TABLE IF NOT EXISTS event_edges(
+    event_id VARCHAR(150) NOT NULL,
+    prev_event_id VARCHAR(150) NOT NULL,
+    room_id VARCHAR(150) NOT NULL,
+    is_state BOOL NOT NULL,
+    UNIQUE (event_id, prev_event_id, room_id, is_state)
+);
+
+CREATE INDEX ev_edges_id ON event_edges(event_id);
+CREATE INDEX ev_edges_prev_id ON event_edges(prev_event_id);
+
+
+CREATE TABLE IF NOT EXISTS room_depth(
+    room_id VARCHAR(150) NOT NULL,
+    min_depth INTEGER NOT NULL,
+    UNIQUE (room_id)
+);
+
+CREATE INDEX room_depth_room ON room_depth(room_id);
+
+
+create TABLE IF NOT EXISTS event_destinations(
+    event_id VARCHAR(150) NOT NULL,
+    destination VARCHAR(150) NOT NULL,
+    delivered_ts BIGINT DEFAULT 0, -- or 0 if not delivered
+    UNIQUE (event_id, destination)
+);
+
+CREATE INDEX event_destinations_id ON event_destinations(event_id);
+
+
+CREATE TABLE IF NOT EXISTS state_forward_extremities(
+    event_id VARCHAR(150) NOT NULL,
+    room_id VARCHAR(150) NOT NULL,
+    type VARCHAR(150) NOT NULL,
+    state_key VARCHAR(150) NOT NULL,
+    UNIQUE (event_id, room_id)
+);
+
+CREATE INDEX st_extrem_keys ON state_forward_extremities(
+    room_id, type, state_key
+);
+CREATE INDEX st_extrem_id ON state_forward_extremities(event_id);
+
+
+CREATE TABLE IF NOT EXISTS event_auth(
+    event_id VARCHAR(150) NOT NULL,
+    auth_id VARCHAR(150) NOT NULL,
+    room_id VARCHAR(150) NOT NULL,
+    UNIQUE (event_id, auth_id, room_id)
+);
+
+CREATE INDEX evauth_edges_id ON event_auth(event_id);
+CREATE INDEX evauth_edges_auth_id ON event_auth(auth_id);
diff --git a/synapse/storage/schema/full_schemas/16/event_signatures.sql b/synapse/storage/schema/full_schemas/16/event_signatures.sql
new file mode 100644
index 0000000000..4291827368
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/event_signatures.sql
@@ -0,0 +1,55 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS event_content_hashes (
+    event_id VARCHAR(150),
+    algorithm VARCHAR(150),
+    hash bytea,
+    UNIQUE (event_id, algorithm)
+);
+
+CREATE INDEX event_content_hashes_id ON event_content_hashes(event_id);
+
+
+CREATE TABLE IF NOT EXISTS event_reference_hashes (
+    event_id VARCHAR(150),
+    algorithm VARCHAR(150),
+    hash bytea,
+    UNIQUE (event_id, algorithm)
+);
+
+CREATE INDEX event_reference_hashes_id ON event_reference_hashes(event_id);
+
+
+CREATE TABLE IF NOT EXISTS event_signatures (
+    event_id VARCHAR(150),
+    signature_name VARCHAR(150),
+    key_id VARCHAR(150),
+    signature bytea,
+    UNIQUE (event_id, signature_name, key_id)
+);
+
+CREATE INDEX event_signatures_id ON event_signatures(event_id);
+
+
+CREATE TABLE IF NOT EXISTS event_edge_hashes(
+    event_id VARCHAR(150),
+    prev_event_id VARCHAR(150),
+    algorithm VARCHAR(150),
+    hash bytea,
+    UNIQUE (event_id, prev_event_id, algorithm)
+);
+
+CREATE INDEX event_edge_hashes_id ON event_edge_hashes(event_id);
diff --git a/synapse/storage/schema/full_schemas/16/im.sql b/synapse/storage/schema/full_schemas/16/im.sql
new file mode 100644
index 0000000000..5b4b494484
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/im.sql
@@ -0,0 +1,128 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS events(
+    stream_ordering INTEGER PRIMARY KEY,
+    topological_ordering BIGINT NOT NULL,
+    event_id VARCHAR(150) NOT NULL,
+    type VARCHAR(150) NOT NULL,
+    room_id VARCHAR(150) NOT NULL,
+    content TEXT NOT NULL,
+    unrecognized_keys TEXT,
+    processed BOOL NOT NULL,
+    outlier BOOL NOT NULL,
+    depth BIGINT DEFAULT 0 NOT NULL,
+    UNIQUE (event_id)
+);
+
+CREATE INDEX events_stream_ordering ON events (stream_ordering);
+CREATE INDEX events_topological_ordering ON events (topological_ordering);
+CREATE INDEX events_order ON events (topological_ordering, stream_ordering);
+CREATE INDEX events_room_id ON events (room_id);
+CREATE INDEX events_order_room ON events (
+    room_id, topological_ordering, stream_ordering
+);
+
+
+CREATE TABLE IF NOT EXISTS event_json(
+    event_id VARCHAR(150) NOT NULL,
+    room_id VARCHAR(150) NOT NULL,
+    internal_metadata TEXT NOT NULL,
+    json TEXT NOT NULL,
+    UNIQUE (event_id)
+);
+
+CREATE INDEX event_json_room_id ON event_json(room_id);
+
+
+CREATE TABLE IF NOT EXISTS state_events(
+    event_id VARCHAR(150) NOT NULL,
+    room_id VARCHAR(150) NOT NULL,
+    type VARCHAR(150) NOT NULL,
+    state_key VARCHAR(150) NOT NULL,
+    prev_state VARCHAR(150),
+    UNIQUE (event_id)
+);
+
+CREATE INDEX state_events_room_id ON state_events (room_id);
+CREATE INDEX state_events_type ON state_events (type);
+CREATE INDEX state_events_state_key ON state_events (state_key);
+
+
+CREATE TABLE IF NOT EXISTS current_state_events(
+    event_id VARCHAR(150) NOT NULL,
+    room_id VARCHAR(150) NOT NULL,
+    type VARCHAR(150) NOT NULL,
+    state_key VARCHAR(150) NOT NULL,
+    UNIQUE (event_id),
+    UNIQUE (room_id, type, state_key)
+);
+
+CREATE INDEX current_state_events_room_id ON current_state_events (room_id);
+CREATE INDEX current_state_events_type ON current_state_events (type);
+CREATE INDEX current_state_events_state_key ON current_state_events (state_key);
+
+CREATE TABLE IF NOT EXISTS room_memberships(
+    event_id VARCHAR(150) NOT NULL,
+    user_id VARCHAR(150) NOT NULL,
+    sender VARCHAR(150) NOT NULL,
+    room_id VARCHAR(150) NOT NULL,
+    membership VARCHAR(150) NOT NULL,
+    UNIQUE (event_id)
+);
+
+CREATE INDEX room_memberships_room_id ON room_memberships (room_id);
+CREATE INDEX room_memberships_user_id ON room_memberships (user_id);
+
+CREATE TABLE IF NOT EXISTS feedback(
+    event_id VARCHAR(150) NOT NULL,
+    feedback_type VARCHAR(150),
+    target_event_id VARCHAR(150),
+    sender VARCHAR(150),
+    room_id VARCHAR(150),
+    UNIQUE (event_id)
+);
+
+CREATE TABLE IF NOT EXISTS topics(
+    event_id VARCHAR(150) NOT NULL,
+    room_id VARCHAR(150) NOT NULL,
+    topic TEXT NOT NULL,
+    UNIQUE (event_id)
+);
+
+CREATE INDEX topics_room_id ON topics(room_id);
+
+CREATE TABLE IF NOT EXISTS room_names(
+    event_id VARCHAR(150) NOT NULL,
+    room_id VARCHAR(150) NOT NULL,
+    name TEXT NOT NULL,
+    UNIQUE (event_id)
+);
+
+CREATE INDEX room_names_room_id ON room_names(room_id);
+
+CREATE TABLE IF NOT EXISTS rooms(
+    room_id VARCHAR(150) PRIMARY KEY NOT NULL,
+    is_public BOOL,
+    creator VARCHAR(150)
+);
+
+CREATE TABLE IF NOT EXISTS room_hosts(
+    room_id VARCHAR(150) NOT NULL,
+    host VARCHAR(150) NOT NULL,
+    UNIQUE (room_id, host)
+);
+
+CREATE INDEX room_hosts_room_id ON room_hosts (room_id);
diff --git a/synapse/storage/schema/full_schemas/16/keys.sql b/synapse/storage/schema/full_schemas/16/keys.sql
new file mode 100644
index 0000000000..459b510427
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/keys.sql
@@ -0,0 +1,31 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+CREATE TABLE IF NOT EXISTS server_tls_certificates(
+  server_name VARCHAR(150), -- Server name.
+  fingerprint VARCHAR(150), -- Certificate fingerprint.
+  from_server VARCHAR(150), -- Which key server the certificate was fetched from.
+  ts_added_ms BIGINT, -- When the certifcate was added.
+  tls_certificate bytea, -- DER encoded x509 certificate.
+  UNIQUE (server_name, fingerprint)
+);
+
+CREATE TABLE IF NOT EXISTS server_signature_keys(
+  server_name VARCHAR(150), -- Server name.
+  key_id VARCHAR(150), -- Key version.
+  from_server VARCHAR(150), -- Which key server the key was fetched form.
+  ts_added_ms BIGINT, -- When the key was added.
+  verify_key bytea, -- NACL verification key.
+  UNIQUE (server_name, key_id)
+);
diff --git a/synapse/storage/schema/full_schemas/16/media_repository.sql b/synapse/storage/schema/full_schemas/16/media_repository.sql
new file mode 100644
index 0000000000..0e819fca38
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/media_repository.sql
@@ -0,0 +1,68 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS local_media_repository (
+    media_id VARCHAR(150), -- The id used to refer to the media.
+    media_type VARCHAR(150), -- The MIME-type of the media.
+    media_length INTEGER, -- Length of the media in bytes.
+    created_ts BIGINT, -- When the content was uploaded in ms.
+    upload_name VARCHAR(150), -- The name the media was uploaded with.
+    user_id VARCHAR(150), -- The user who uploaded the file.
+    UNIQUE (media_id)
+);
+
+CREATE TABLE IF NOT EXISTS local_media_repository_thumbnails (
+    media_id VARCHAR(150), -- The id used to refer to the media.
+    thumbnail_width INTEGER, -- The width of the thumbnail in pixels.
+    thumbnail_height INTEGER, -- The height of the thumbnail in pixels.
+    thumbnail_type VARCHAR(150), -- The MIME-type of the thumbnail.
+    thumbnail_method VARCHAR(150), -- The method used to make the thumbnail.
+    thumbnail_length INTEGER, -- The length of the thumbnail in bytes.
+    UNIQUE (
+        media_id, thumbnail_width, thumbnail_height, thumbnail_type
+    )
+);
+
+CREATE INDEX local_media_repository_thumbnails_media_id
+    ON local_media_repository_thumbnails (media_id);
+
+CREATE TABLE IF NOT EXISTS remote_media_cache (
+    media_origin VARCHAR(150), -- The remote HS the media came from.
+    media_id VARCHAR(150), -- The id used to refer to the media on that server.
+    media_type VARCHAR(150), -- The MIME-type of the media.
+    created_ts BIGINT, -- When the content was uploaded in ms.
+    upload_name VARCHAR(150), -- The name the media was uploaded with.
+    media_length INTEGER, -- Length of the media in bytes.
+    filesystem_id VARCHAR(150), -- The name used to store the media on disk.
+    UNIQUE (media_origin, media_id)
+);
+
+CREATE TABLE IF NOT EXISTS remote_media_cache_thumbnails (
+    media_origin VARCHAR(150), -- The remote HS the media came from.
+    media_id VARCHAR(150), -- The id used to refer to the media.
+    thumbnail_width INTEGER, -- The width of the thumbnail in pixels.
+    thumbnail_height INTEGER, -- The height of the thumbnail in pixels.
+    thumbnail_method VARCHAR(150), -- The method used to make the thumbnail
+    thumbnail_type VARCHAR(150), -- The MIME-type of the thumbnail.
+    thumbnail_length INTEGER, -- The length of the thumbnail in bytes.
+    filesystem_id VARCHAR(150), -- The name used to store the media on disk.
+    UNIQUE (
+        media_origin, media_id, thumbnail_width, thumbnail_height,
+        thumbnail_type
+     )
+);
+
+CREATE INDEX remote_media_cache_thumbnails_media_id
+    ON remote_media_cache_thumbnails (media_id);
diff --git a/synapse/storage/schema/full_schemas/16/presence.sql b/synapse/storage/schema/full_schemas/16/presence.sql
new file mode 100644
index 0000000000..9c41be296e
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/presence.sql
@@ -0,0 +1,40 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+CREATE TABLE IF NOT EXISTS presence(
+  user_id VARCHAR(150) NOT NULL,
+  state VARCHAR(20),
+  status_msg VARCHAR(150),
+  mtime BIGINT, -- miliseconds since last state change
+  UNIQUE (user_id)
+);
+
+-- For each of /my/ users which possibly-remote users are allowed to see their
+-- presence state
+CREATE TABLE IF NOT EXISTS presence_allow_inbound(
+  observed_user_id VARCHAR(150) NOT NULL,
+  observer_user_id VARCHAR(150) NOT NULL, -- a UserID,
+  UNIQUE (observed_user_id, observer_user_id)
+);
+
+-- For each of /my/ users (watcher), which possibly-remote users are they
+-- watching?
+CREATE TABLE IF NOT EXISTS presence_list(
+  user_id VARCHAR(150) NOT NULL,
+  observed_user_id VARCHAR(150) NOT NULL, -- a UserID,
+  accepted BOOLEAN NOT NULL,
+  UNIQUE (user_id, observed_user_id)
+);
+
+CREATE INDEX presence_list_user_id ON presence_list (user_id);
diff --git a/synapse/storage/schema/full_schemas/16/profiles.sql b/synapse/storage/schema/full_schemas/16/profiles.sql
new file mode 100644
index 0000000000..21c58a99bc
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/profiles.sql
@@ -0,0 +1,20 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+CREATE TABLE IF NOT EXISTS profiles(
+    user_id VARCHAR(150) NOT NULL,
+    displayname VARCHAR(150),
+    avatar_url VARCHAR(150),
+    UNIQUE(user_id)
+);
diff --git a/synapse/storage/schema/full_schemas/16/push.sql b/synapse/storage/schema/full_schemas/16/push.sql
new file mode 100644
index 0000000000..5c0c7bc201
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/push.sql
@@ -0,0 +1,73 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS rejections(
+    event_id VARCHAR(150) NOT NULL,
+    reason VARCHAR(150) NOT NULL,
+    last_check VARCHAR(150) NOT NULL,
+    UNIQUE (event_id)
+);
+
+-- Push notification endpoints that users have configured
+CREATE TABLE IF NOT EXISTS pushers (
+  id BIGINT PRIMARY KEY,
+  user_name VARCHAR(150) NOT NULL,
+  profile_tag VARCHAR(32) NOT NULL,
+  kind VARCHAR(8) NOT NULL,
+  app_id VARCHAR(64) NOT NULL,
+  app_display_name VARCHAR(64) NOT NULL,
+  device_display_name VARCHAR(128) NOT NULL,
+  pushkey bytea NOT NULL,
+  ts BIGINT NOT NULL,
+  lang VARCHAR(8),
+  data bytea,
+  last_token TEXT,
+  last_success BIGINT,
+  failing_since BIGINT,
+  UNIQUE (app_id, pushkey)
+);
+
+CREATE TABLE IF NOT EXISTS push_rules (
+  id BIGINT PRIMARY KEY,
+  user_name VARCHAR(150) NOT NULL,
+  rule_id VARCHAR(150) NOT NULL,
+  priority_class SMALLINT NOT NULL,
+  priority INTEGER NOT NULL DEFAULT 0,
+  conditions VARCHAR(150) NOT NULL,
+  actions VARCHAR(150) NOT NULL,
+  UNIQUE(user_name, rule_id)
+);
+
+CREATE INDEX push_rules_user_name on push_rules (user_name);
+
+CREATE TABLE IF NOT EXISTS user_filters(
+  user_id VARCHAR(150),
+  filter_id BIGINT,
+  filter_json bytea
+);
+
+CREATE INDEX user_filters_by_user_id_filter_id ON user_filters(
+    user_id, filter_id
+);
+
+CREATE TABLE IF NOT EXISTS push_rules_enable (
+  id BIGINT PRIMARY KEY,
+  user_name VARCHAR(150) NOT NULL,
+  rule_id VARCHAR(150) NOT NULL,
+  enabled SMALLINT,
+  UNIQUE(user_name, rule_id)
+);
+
+CREATE INDEX push_rules_enable_user_name on push_rules_enable (user_name);
diff --git a/synapse/storage/schema/full_schemas/16/redactions.sql b/synapse/storage/schema/full_schemas/16/redactions.sql
new file mode 100644
index 0000000000..492fd22033
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/redactions.sql
@@ -0,0 +1,22 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+CREATE TABLE IF NOT EXISTS redactions (
+    event_id VARCHAR(150) NOT NULL,
+    redacts VARCHAR(150) NOT NULL,
+    UNIQUE (event_id)
+);
+
+CREATE INDEX redactions_event_id ON redactions (event_id);
+CREATE INDEX redactions_redacts ON redactions (redacts);
diff --git a/synapse/storage/schema/full_schemas/16/room_aliases.sql b/synapse/storage/schema/full_schemas/16/room_aliases.sql
new file mode 100644
index 0000000000..2c0853a2a9
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/room_aliases.sql
@@ -0,0 +1,29 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS room_aliases(
+    room_alias VARCHAR(150) NOT NULL,
+    room_id VARCHAR(150) NOT NULL,
+    UNIQUE (room_alias)
+);
+
+CREATE INDEX room_aliases_id ON room_aliases(room_id);
+
+CREATE TABLE IF NOT EXISTS room_alias_servers(
+    room_alias VARCHAR(150) NOT NULL,
+    server VARCHAR(150) NOT NULL
+);
+
+CREATE INDEX room_alias_servers_alias ON room_alias_servers(room_alias);
diff --git a/synapse/storage/schema/full_schemas/16/state.sql b/synapse/storage/schema/full_schemas/16/state.sql
new file mode 100644
index 0000000000..b0cd5ee75a
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/state.sql
@@ -0,0 +1,40 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS state_groups(
+    id BIGINT PRIMARY KEY,
+    room_id VARCHAR(150) NOT NULL,
+    event_id VARCHAR(150) NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS state_groups_state(
+    state_group BIGINT NOT NULL,
+    room_id VARCHAR(150) NOT NULL,
+    type VARCHAR(150) NOT NULL,
+    state_key VARCHAR(150) NOT NULL,
+    event_id VARCHAR(150) NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS event_to_state_groups(
+    event_id VARCHAR(150) NOT NULL,
+    state_group BIGINT NOT NULL,
+    UNIQUE (event_id)
+);
+
+CREATE INDEX state_groups_id ON state_groups(id);
+
+CREATE INDEX state_groups_state_id ON state_groups_state(state_group);
+CREATE INDEX state_groups_state_tuple ON state_groups_state(room_id, type, state_key);
+CREATE INDEX event_to_state_groups_id ON event_to_state_groups(event_id);
diff --git a/synapse/storage/schema/full_schemas/16/transactions.sql b/synapse/storage/schema/full_schemas/16/transactions.sql
new file mode 100644
index 0000000000..ed431bd3af
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/transactions.sql
@@ -0,0 +1,63 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-- Stores what transaction ids we have received and what our response was
+CREATE TABLE IF NOT EXISTS received_transactions(
+    transaction_id VARCHAR(150),
+    origin VARCHAR(150),
+    ts BIGINT,
+    response_code INTEGER,
+    response_json bytea,
+    has_been_referenced smallint default 0, -- Whether thishas been referenced by a prev_tx
+    UNIQUE (transaction_id, origin)
+);
+
+CREATE INDEX transactions_have_ref ON received_transactions(origin, has_been_referenced);-- WHERE has_been_referenced = 0;
+
+
+-- Stores what transactions we've sent, what their response was (if we got one) and whether we have
+-- since referenced the transaction in another outgoing transaction
+CREATE TABLE IF NOT EXISTS sent_transactions(
+    id BIGINT PRIMARY KEY, -- This is used to apply insertion ordering
+    transaction_id VARCHAR(150),
+    destination VARCHAR(150),
+    response_code INTEGER DEFAULT 0,
+    response_json TEXT,
+    ts BIGINT
+);
+
+CREATE INDEX sent_transaction_dest ON sent_transactions(destination);
+CREATE INDEX sent_transaction_txn_id ON sent_transactions(transaction_id);
+-- So that we can do an efficient look up of all transactions that have yet to be successfully
+-- sent.
+CREATE INDEX sent_transaction_sent ON sent_transactions(response_code);
+
+
+-- For sent transactions only.
+CREATE TABLE IF NOT EXISTS transaction_id_to_pdu(
+    transaction_id INTEGER,
+    destination VARCHAR(150),
+    pdu_id VARCHAR(150),
+    pdu_origin VARCHAR(150),
+    UNIQUE (transaction_id, destination)
+);
+
+CREATE INDEX transaction_id_to_pdu_dest ON transaction_id_to_pdu(destination);
+
+-- To track destination health
+CREATE TABLE IF NOT EXISTS destinations(
+    destination VARCHAR(150) PRIMARY KEY,
+    retry_last_ts BIGINT,
+    retry_interval INTEGER
+);
diff --git a/synapse/storage/schema/full_schemas/16/users.sql b/synapse/storage/schema/full_schemas/16/users.sql
new file mode 100644
index 0000000000..033e3244b5
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/users.sql
@@ -0,0 +1,42 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+CREATE TABLE IF NOT EXISTS users(
+    name VARCHAR(150),
+    password_hash VARCHAR(150),
+    creation_ts BIGINT,
+    admin SMALLINT DEFAULT 0 NOT NULL,
+    UNIQUE(name)
+);
+
+CREATE TABLE IF NOT EXISTS access_tokens(
+    id BIGINT PRIMARY KEY,
+    user_id VARCHAR(150) NOT NULL,
+    device_id VARCHAR(150),
+    token VARCHAR(150) NOT NULL,
+    last_used BIGINT,
+    UNIQUE(token)
+);
+
+CREATE TABLE IF NOT EXISTS user_ips (
+    user_id VARCHAR(150) NOT NULL,
+    access_token VARCHAR(150) NOT NULL,
+    device_id VARCHAR(150),
+    ip VARCHAR(150) NOT NULL,
+    user_agent TEXT NOT NULL,
+    last_seen BIGINT NOT NULL
+);
+
+CREATE INDEX user_ips_user ON user_ips(user_id);
+CREATE INDEX user_ips_user_ip ON user_ips(user_id, access_token, ip);
diff --git a/synapse/storage/schema/schema_version.sql b/synapse/storage/schema/schema_version.sql
index 0431e2d051..d9494611e0 100644
--- a/synapse/storage/schema/schema_version.sql
+++ b/synapse/storage/schema/schema_version.sql
@@ -14,17 +14,14 @@
  */
 
 CREATE TABLE IF NOT EXISTS schema_version(
-    Lock char(1) NOT NULL DEFAULT 'X',  -- Makes sure this table only has one row.
+    Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE,  -- Makes sure this table only has one row.
     version INTEGER NOT NULL,
     upgraded BOOL NOT NULL,  -- Whether we reached this version from an upgrade or an initial schema.
-    CONSTRAINT schema_version_lock_x CHECK (Lock='X')
-    CONSTRAINT schema_version_lock_uniq UNIQUE (Lock)
+    CHECK (Lock='X')
 );
 
 CREATE TABLE IF NOT EXISTS applied_schema_deltas(
     version INTEGER NOT NULL,
-    file TEXT NOT NULL,
-    CONSTRAINT schema_deltas_ver_file UNIQUE (version, file) ON CONFLICT IGNORE
+    file VARCHAR(150) NOT NULL,
+    UNIQUE(version, file)
 );
-
-CREATE INDEX IF NOT EXISTS schema_deltas_ver ON applied_schema_deltas(version);
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index d0d53770f2..f051828630 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -56,7 +56,6 @@ class SignatureStore(SQLBaseStore):
                 "algorithm": algorithm,
                 "hash": buffer(hash_bytes),
             },
-            or_ignore=True,
         )
 
     def get_event_reference_hashes(self, event_ids):
@@ -100,7 +99,7 @@ class SignatureStore(SQLBaseStore):
             " WHERE event_id = ?"
         )
         txn.execute(query, (event_id, ))
-        return dict(txn.fetchall())
+        return {k: v for k, v in txn.fetchall()}
 
     def _store_event_reference_hash_txn(self, txn, event_id, algorithm,
                                         hash_bytes):
@@ -119,7 +118,6 @@ class SignatureStore(SQLBaseStore):
                 "algorithm": algorithm,
                 "hash": buffer(hash_bytes),
             },
-            or_ignore=True,
         )
 
     def _get_event_signatures_txn(self, txn, event_id):
@@ -164,7 +162,6 @@ class SignatureStore(SQLBaseStore):
                 "key_id": key_id,
                 "signature": buffer(signature_bytes),
             },
-            or_ignore=True,
         )
 
     def _get_prev_event_hashes_txn(self, txn, event_id):
@@ -198,5 +195,4 @@ class SignatureStore(SQLBaseStore):
                 "algorithm": algorithm,
                 "hash": buffer(hash_bytes),
             },
-            or_ignore=True,
         )
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 58dbf2802b..553ba9dd1f 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -17,6 +17,8 @@ from ._base import SQLBaseStore
 
 from twisted.internet import defer
 
+from synapse.util.stringutils import random_string
+
 import logging
 
 logger = logging.getLogger(__name__)
@@ -91,14 +93,15 @@ class StateStore(SQLBaseStore):
 
         state_group = context.state_group
         if not state_group:
-            state_group = self._simple_insert_txn(
+            state_group = self._state_groups_id_gen.get_next_txn(txn)
+            self._simple_insert_txn(
                 txn,
                 table="state_groups",
                 values={
+                    "id": state_group,
                     "room_id": event.room_id,
                     "event_id": event.event_id,
                 },
-                or_ignore=True,
             )
 
             for state in state_events.values():
@@ -112,7 +115,6 @@ class StateStore(SQLBaseStore):
                         "state_key": state.state_key,
                         "event_id": state.event_id,
                     },
-                    or_ignore=True,
                 )
 
         self._simple_insert_txn(
@@ -122,7 +124,6 @@ class StateStore(SQLBaseStore):
                 "state_group": state_group,
                 "event_id": event.event_id,
             },
-            or_replace=True,
         )
 
     @defer.inlineCallbacks
@@ -154,3 +155,7 @@ class StateStore(SQLBaseStore):
 
         events = yield self._parse_events(results)
         defer.returnValue(events)
+
+
+def _make_group_id(clock):
+    return str(int(clock.time_msec())) + random_string(5)
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 66f307e640..df6de7cbcd 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -35,7 +35,7 @@ what sort order was used:
 
 from twisted.internet import defer
 
-from ._base import SQLBaseStore, cached
+from ._base import SQLBaseStore
 from synapse.api.constants import EventTypes
 from synapse.api.errors import SynapseError
 from synapse.util.logutils import log_function
@@ -110,7 +110,7 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")):
         if self.topological is None:
             return "(%d < %s)" % (self.stream, "stream_ordering")
         else:
-            return "(%d < %s OR (%d == %s AND %d < %s))" % (
+            return "(%d < %s OR (%d = %s AND %d < %s))" % (
                 self.topological, "topological_ordering",
                 self.topological, "topological_ordering",
                 self.stream, "stream_ordering",
@@ -120,7 +120,7 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")):
         if self.topological is None:
             return "(%d >= %s)" % (self.stream, "stream_ordering")
         else:
-            return "(%d > %s OR (%d == %s AND %d >= %s))" % (
+            return "(%d > %s OR (%d = %s AND %d >= %s))" % (
                 self.topological, "topological_ordering",
                 self.topological, "topological_ordering",
                 self.stream, "stream_ordering",
@@ -240,7 +240,7 @@ class StreamStore(SQLBaseStore):
 
         sql = (
             "SELECT e.event_id, e.stream_ordering FROM events AS e WHERE "
-            "(e.outlier = 0 AND (room_id IN (%(current)s)) OR "
+            "(e.outlier = ? AND (room_id IN (%(current)s)) OR "
             "(event_id IN (%(invites)s))) "
             "AND e.stream_ordering > ? AND e.stream_ordering <= ? "
             "ORDER BY stream_ordering ASC LIMIT %(limit)d "
@@ -251,7 +251,7 @@ class StreamStore(SQLBaseStore):
         }
 
         def f(txn):
-            txn.execute(sql, (user_id, user_id, from_id.stream, to_id.stream,))
+            txn.execute(sql, (False, user_id, user_id, from_id.stream, to_id.stream,))
 
             rows = self.cursor_to_dict(txn)
 
@@ -283,7 +283,7 @@ class StreamStore(SQLBaseStore):
         # Tokens really represent positions between elements, but we use
         # the convention of pointing to the event before the gap. Hence
         # we have a bit of asymmetry when it comes to equalities.
-        args = [room_id]
+        args = [False, room_id]
         if direction == 'b':
             order = "DESC"
             bounds = _StreamToken.parse(from_key).upper_bound()
@@ -307,7 +307,7 @@ class StreamStore(SQLBaseStore):
 
         sql = (
             "SELECT * FROM events"
-            " WHERE outlier = 0 AND room_id = ? AND %(bounds)s"
+            " WHERE outlier = ? AND room_id = ? AND %(bounds)s"
             " ORDER BY topological_ordering %(order)s,"
             " stream_ordering %(order)s %(limit)s"
         ) % {
@@ -358,7 +358,7 @@ class StreamStore(SQLBaseStore):
             sql = (
                 "SELECT stream_ordering, topological_ordering, event_id"
                 " FROM events"
-                " WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0"
+                " WHERE room_id = ? AND stream_ordering <= ? AND outlier = ?"
                 " ORDER BY topological_ordering DESC, stream_ordering DESC"
                 " LIMIT ?"
             )
@@ -368,17 +368,17 @@ class StreamStore(SQLBaseStore):
                 "SELECT stream_ordering, topological_ordering, event_id"
                 " FROM events"
                 " WHERE room_id = ? AND stream_ordering > ?"
-                " AND stream_ordering <= ? AND outlier = 0"
+                " AND stream_ordering <= ? AND outlier = ?"
                 " ORDER BY topological_ordering DESC, stream_ordering DESC"
                 " LIMIT ?"
             )
 
         def get_recent_events_for_room_txn(txn):
             if from_token is None:
-                txn.execute(sql, (room_id, end_token.stream, limit,))
+                txn.execute(sql, (room_id, end_token.stream, False, limit,))
             else:
                 txn.execute(sql, (
-                    room_id, from_token.stream, end_token.stream, limit
+                    room_id, from_token.stream, end_token.stream, False, limit
                 ))
 
             rows = self.cursor_to_dict(txn)
@@ -413,12 +413,10 @@ class StreamStore(SQLBaseStore):
             "get_recent_events_for_room", get_recent_events_for_room_txn
         )
 
-    @cached(num_args=0)
+    @defer.inlineCallbacks
     def get_room_events_max_id(self):
-        return self.runInteraction(
-            "get_room_events_max_id",
-            self._get_room_events_max_id_txn
-        )
+        token = yield self._stream_id_gen.get_max_token(self)
+        defer.returnValue("s%d" % (token,))
 
     @defer.inlineCallbacks
     def _get_min_token(self):
@@ -433,27 +431,6 @@ class StreamStore(SQLBaseStore):
 
         defer.returnValue(self.min_token)
 
-    def get_next_stream_id(self):
-        with self._next_stream_id_lock:
-            i = self._next_stream_id
-            self._next_stream_id += 1
-            return i
-
-    def _get_room_events_max_id_txn(self, txn):
-        txn.execute(
-            "SELECT MAX(stream_ordering) as m FROM events"
-        )
-
-        res = self.cursor_to_dict(txn)
-
-        logger.debug("get_room_events_max_id: %s", res)
-
-        if not res or not res[0] or not res[0]["m"]:
-            return "s0"
-
-        key = res[0]["m"]
-        return "s%d" % (key,)
-
     @staticmethod
     def _set_before_and_after(events, rows):
         for event, row in zip(events, rows):
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index b777395e06..7e3add5280 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import SQLBaseStore, Table, cached
+from ._base import SQLBaseStore, cached
 
 from collections import namedtuple
 
@@ -84,13 +84,18 @@ class TransactionStore(SQLBaseStore):
 
     def _set_received_txn_response(self, txn, transaction_id, origin, code,
                                    response_json):
-        query = (
-            "UPDATE %s "
-            "SET response_code = ?, response_json = ? "
-            "WHERE transaction_id = ? AND origin = ?"
-        ) % ReceivedTransactionsTable.table_name
-
-        txn.execute(query, (code, response_json, transaction_id, origin))
+        self._simple_upsert_txn(
+            txn,
+            table=ReceivedTransactionsTable.table_name,
+            keyvalues={
+                "transaction_id": transaction_id,
+                "origin": origin,
+            },
+            values={
+                "response_code": code,
+                "response_json": response_json,
+            }
+        )
 
     def prep_send_transaction(self, transaction_id, destination,
                               origin_server_ts):
@@ -118,41 +123,38 @@ class TransactionStore(SQLBaseStore):
     def _prep_send_transaction(self, txn, transaction_id, destination,
                                origin_server_ts):
 
+        next_id = self._transaction_id_gen.get_next_txn(txn)
+
         # First we find out what the prev_txns should be.
         # Since we know that we are only sending one transaction at a time,
         # we can simply take the last one.
-        query = "%s ORDER BY id DESC LIMIT 1" % (
-                SentTransactions.select_statement("destination = ?"),
-            )
+        query = (
+            "SELECT * FROM sent_transactions"
+            " WHERE destination = ?"
+            " ORDER BY id DESC LIMIT 1"
+        )
 
-        results = txn.execute(query, (destination,))
-        results = SentTransactions.decode_results(results)
+        txn.execute(query, (destination,))
+        results = self.cursor_to_dict(txn)
 
-        prev_txns = [r.transaction_id for r in results]
+        prev_txns = [r["transaction_id"] for r in results]
 
         # Actually add the new transaction to the sent_transactions table.
 
-        query = SentTransactions.insert_statement()
-        txn.execute(query, SentTransactions.EntryType(
-            None,
-            transaction_id=transaction_id,
-            destination=destination,
-            ts=origin_server_ts,
-            response_code=0,
-            response_json=None
-        ))
-
-        # Update the tx id -> pdu id mapping
-
-        # values = [
-        #     (transaction_id, destination, pdu[0], pdu[1])
-        #     for pdu in pdu_list
-        # ]
-        #
-        # logger.debug("Inserting: %s", repr(values))
-        #
-        # query = TransactionsToPduTable.insert_statement()
-        # txn.executemany(query, values)
+        self._simple_insert_txn(
+            txn,
+            table=SentTransactions.table_name,
+            values={
+                "id": next_id,
+                "transaction_id": transaction_id,
+                "destination": destination,
+                "ts": origin_server_ts,
+                "response_code": 0,
+                "response_json": None,
+            }
+        )
+
+        # TODO Update the tx id -> pdu id mapping
 
         return prev_txns
 
@@ -171,15 +173,20 @@ class TransactionStore(SQLBaseStore):
             transaction_id, destination, code, response_dict
         )
 
-    def _delivered_txn(cls, txn, transaction_id, destination,
+    def _delivered_txn(self, txn, transaction_id, destination,
                        code, response_json):
-        query = (
-            "UPDATE %s "
-            "SET response_code = ?, response_json = ? "
-            "WHERE transaction_id = ? AND destination = ?"
-        ) % SentTransactions.table_name
-
-        txn.execute(query, (code, response_json, transaction_id, destination))
+        self._simple_update_one_txn(
+            txn,
+            table=SentTransactions.table_name,
+            keyvalues={
+                "transaction_id": transaction_id,
+                "destination": destination,
+            },
+            updatevalues={
+                "response_code": code,
+                "response_json": None,  # For now, don't persist response_json
+            }
+        )
 
     def get_transactions_after(self, transaction_id, destination):
         """Get all transactions after a given local transaction_id.
@@ -189,25 +196,26 @@ class TransactionStore(SQLBaseStore):
             destination (str)
 
         Returns:
-            list: A list of `ReceivedTransactionsTable.EntryType`
+            list: A list of dicts
         """
         return self.runInteraction(
             "get_transactions_after",
             self._get_transactions_after, transaction_id, destination
         )
 
-    def _get_transactions_after(cls, txn, transaction_id, destination):
-        where = (
-            "destination = ? AND id > (select id FROM %s WHERE "
-            "transaction_id = ? AND destination = ?)"
-        ) % (
-            SentTransactions.table_name
+    def _get_transactions_after(self, txn, transaction_id, destination):
+        query = (
+            "SELECT * FROM sent_transactions"
+            " WHERE destination = ? AND id >"
+            " ("
+            " SELECT id FROM sent_transactions"
+            " WHERE transaction_id = ? AND destination = ?"
+            " )"
         )
-        query = SentTransactions.select_statement(where)
 
         txn.execute(query, (destination, transaction_id, destination))
 
-        return ReceivedTransactionsTable.decode_results(txn.fetchall())
+        return self.cursor_to_dict(txn)
 
     @cached()
     def get_destination_retry_timings(self, destination):
@@ -218,22 +226,27 @@ class TransactionStore(SQLBaseStore):
 
         Returns:
             None if not retrying
-            Otherwise a DestinationsTable.EntryType for the retry scheme
+            Otherwise a dict for the retry scheme
         """
         return self.runInteraction(
             "get_destination_retry_timings",
             self._get_destination_retry_timings, destination)
 
-    def _get_destination_retry_timings(cls, txn, destination):
-        query = DestinationsTable.select_statement("destination = ?")
-        txn.execute(query, (destination,))
-        result = txn.fetchall()
-        if result:
-            result = DestinationsTable.decode_single_result(result)
-            if result.retry_last_ts > 0:
-                return result
-            else:
-                return None
+    def _get_destination_retry_timings(self, txn, destination):
+        result = self._simple_select_one_txn(
+            txn,
+            table=DestinationsTable.table_name,
+            keyvalues={
+                "destination": destination,
+            },
+            retcols=DestinationsTable.fields,
+            allow_none=True,
+        )
+
+        if result and result["retry_last_ts"] > 0:
+            return result
+        else:
+            return None
 
     def set_destination_retry_timings(self, destination,
                                       retry_last_ts, retry_interval):
@@ -249,11 +262,11 @@ class TransactionStore(SQLBaseStore):
         # As this is the new value, we might as well prefill the cache
         self.get_destination_retry_timings.prefill(
             destination,
-            DestinationsTable.EntryType(
-                destination,
-                retry_last_ts,
-                retry_interval
-            )
+            {
+                "destination": destination,
+                "retry_last_ts": retry_last_ts,
+                "retry_interval": retry_interval
+            },
         )
 
         # XXX: we could chose to not bother persisting this if our cache thinks
@@ -266,22 +279,38 @@ class TransactionStore(SQLBaseStore):
             retry_interval,
         )
 
-    def _set_destination_retry_timings(cls, txn, destination,
+    def _set_destination_retry_timings(self, txn, destination,
                                        retry_last_ts, retry_interval):
-
         query = (
-            "INSERT OR REPLACE INTO %s "
-            "(destination, retry_last_ts, retry_interval) "
-            "VALUES (?, ?, ?) "
-        ) % DestinationsTable.table_name
+            "UPDATE destinations"
+            " SET retry_last_ts = ?, retry_interval = ?"
+            " WHERE destination = ?"
+        )
+
+        txn.execute(
+            query,
+            (
+                retry_last_ts, retry_interval, destination,
+            )
+        )
 
-        txn.execute(query, (destination, retry_last_ts, retry_interval))
+        if txn.rowcount == 0:
+            # destination wasn't already in table. Insert it.
+            self._simple_insert_txn(
+                txn,
+                table="destinations",
+                values={
+                    "destination": destination,
+                    "retry_last_ts": retry_last_ts,
+                    "retry_interval": retry_interval,
+                }
+            )
 
     def get_destinations_needing_retry(self):
         """Get all destinations which are due a retry for sending a transaction.
 
         Returns:
-            list: A list of `DestinationsTable.EntryType`
+            list: A list of dicts
         """
 
         return self.runInteraction(
@@ -289,14 +318,17 @@ class TransactionStore(SQLBaseStore):
             self._get_destinations_needing_retry
         )
 
-    def _get_destinations_needing_retry(cls, txn):
-        where = "retry_last_ts > 0 and retry_next_ts < now()"
-        query = DestinationsTable.select_statement(where)
-        txn.execute(query)
-        return DestinationsTable.decode_results(txn.fetchall())
+    def _get_destinations_needing_retry(self, txn):
+        query = (
+            "SELECT * FROM destinations"
+            " WHERE retry_last_ts > 0 and retry_next_ts < ?"
+        )
+
+        txn.execute(query, (self._clock.time_msec(),))
+        return self.cursor_to_dict(txn)
 
 
-class ReceivedTransactionsTable(Table):
+class ReceivedTransactionsTable(object):
     table_name = "received_transactions"
 
     fields = [
@@ -308,10 +340,8 @@ class ReceivedTransactionsTable(Table):
         "has_been_referenced",
     ]
 
-    EntryType = namedtuple("ReceivedTransactionsEntry", fields)
 
-
-class SentTransactions(Table):
+class SentTransactions(object):
     table_name = "sent_transactions"
 
     fields = [
@@ -326,7 +356,7 @@ class SentTransactions(Table):
     EntryType = namedtuple("SentTransactionsEntry", fields)
 
 
-class TransactionsToPduTable(Table):
+class TransactionsToPduTable(object):
     table_name = "transaction_id_to_pdu"
 
     fields = [
@@ -336,10 +366,8 @@ class TransactionsToPduTable(Table):
         "pdu_origin",
     ]
 
-    EntryType = namedtuple("TransactionsToPduEntry", fields)
-
 
-class DestinationsTable(Table):
+class DestinationsTable(object):
     table_name = "destinations"
 
     fields = [
@@ -347,5 +375,3 @@ class DestinationsTable(Table):
         "retry_last_ts",
         "retry_interval",
     ]
-
-    EntryType = namedtuple("DestinationsEntry", fields)
diff --git a/synapse/storage/util/__init__.py b/synapse/storage/util/__init__.py
new file mode 100644
index 0000000000..c488b10d3c
--- /dev/null
+++ b/synapse/storage/util/__init__.py
@@ -0,0 +1,14 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014, 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
new file mode 100644
index 0000000000..e5dec1c948
--- /dev/null
+++ b/synapse/storage/util/id_generators.py
@@ -0,0 +1,130 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014, 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from collections import deque
+import contextlib
+import threading
+
+
+class IdGenerator(object):
+    def __init__(self, table, column, store):
+        self.table = table
+        self.column = column
+        self.store = store
+        self._lock = threading.Lock()
+        self._next_id = None
+
+    @defer.inlineCallbacks
+    def get_next(self):
+        with self._lock:
+            if not self._next_id:
+                res = yield self.store._execute_and_decode(
+                    "IdGenerator_%s" % (self.table,),
+                    "SELECT MAX(%s) as mx FROM %s" % (self.column, self.table,)
+                )
+
+                self._next_id = (res and res[0] and res[0]["mx"]) or 1
+
+            i = self._next_id
+            self._next_id += 1
+            defer.returnValue(i)
+
+    def get_next_txn(self, txn):
+        with self._lock:
+            if self._next_id:
+                i = self._next_id
+                self._next_id += 1
+                return i
+            else:
+                txn.execute(
+                    "SELECT MAX(%s) FROM %s" % (self.column, self.table,)
+                )
+
+                val, = txn.fetchone()
+                cur = val or 0
+                cur += 1
+                self._next_id = cur + 1
+
+                return cur
+
+
+class StreamIdGenerator(object):
+    """Used to generate new stream ids when persisting events while keeping
+    track of which transactions have been completed.
+
+    This allows us to get the "current" stream id, i.e. the stream id such that
+    all ids less than or equal to it have completed. This handles the fact that
+    persistence of events can complete out of order.
+
+    Usage:
+        with stream_id_gen.get_next_txn(txn) as stream_id:
+            # ... persist event ...
+    """
+    def __init__(self):
+        self._lock = threading.Lock()
+
+        self._current_max = None
+        self._unfinished_ids = deque()
+
+    def get_next_txn(self, txn):
+        """
+        Usage:
+            with stream_id_gen.get_next_txn(txn) as stream_id:
+                # ... persist event ...
+        """
+        with self._lock:
+            if not self._current_max:
+                self._compute_current_max(txn)
+
+            self._current_max += 1
+            next_id = self._current_max
+
+            self._unfinished_ids.append(next_id)
+
+        @contextlib.contextmanager
+        def manager():
+            try:
+                yield next_id
+            finally:
+                with self._lock:
+                    self._unfinished_ids.remove(next_id)
+
+        return manager()
+
+    def get_max_token(self, store):
+        """Returns the maximum stream id such that all stream ids less than or
+        equal to it have been successfully persisted.
+        """
+        with self._lock:
+            if self._unfinished_ids:
+                return self._unfinished_ids[0] - 1
+
+            if not self._current_max:
+                return store.runInteraction(
+                    "_compute_current_max",
+                    self._compute_current_max,
+                )
+
+            return self._current_max
+
+    def _compute_current_max(self, txn):
+        txn.execute("SELECT MAX(stream_ordering) FROM events")
+        val, = txn.fetchone()
+
+        self._current_max = int(val) if val else 1
+
+        return self._current_max
diff --git a/synapse/util/lrucache.py b/synapse/util/lrucache.py
index 2f7b615f78..96163c90f1 100644
--- a/synapse/util/lrucache.py
+++ b/synapse/util/lrucache.py
@@ -14,6 +14,10 @@
 # limitations under the License.
 
 
+from functools import wraps
+import threading
+
+
 class LruCache(object):
     """Least-recently-used cache."""
     # TODO(mjark) Add mutex for linked list for thread safety.
@@ -24,6 +28,16 @@ class LruCache(object):
 
         PREV, NEXT, KEY, VALUE = 0, 1, 2, 3
 
+        lock = threading.Lock()
+
+        def synchronized(f):
+            @wraps(f)
+            def inner(*args, **kwargs):
+                with lock:
+                    return f(*args, **kwargs)
+
+            return inner
+
         def add_node(key, value):
             prev_node = list_root
             next_node = prev_node[NEXT]
@@ -51,6 +65,7 @@ class LruCache(object):
             next_node[PREV] = prev_node
             cache.pop(node[KEY], None)
 
+        @synchronized
         def cache_get(key, default=None):
             node = cache.get(key, None)
             if node is not None:
@@ -59,6 +74,7 @@ class LruCache(object):
             else:
                 return default
 
+        @synchronized
         def cache_set(key, value):
             node = cache.get(key, None)
             if node is not None:
@@ -69,6 +85,7 @@ class LruCache(object):
                 if len(cache) > max_size:
                     delete_node(list_root[PREV])
 
+        @synchronized
         def cache_set_default(key, value):
             node = cache.get(key, None)
             if node is not None:
@@ -79,6 +96,7 @@ class LruCache(object):
                     delete_node(list_root[PREV])
                 return value
 
+        @synchronized
         def cache_pop(key, default=None):
             node = cache.get(key, None)
             if node:
@@ -87,9 +105,11 @@ class LruCache(object):
             else:
                 return default
 
+        @synchronized
         def cache_len():
             return len(cache)
 
+        @synchronized
         def cache_contains(key):
             return key in cache
 
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index 4e82232796..a42138f556 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -60,7 +60,7 @@ def get_retry_limiter(destination, clock, store, **kwargs):
 
     if retry_timings:
         retry_last_ts, retry_interval = (
-            retry_timings.retry_last_ts, retry_timings.retry_interval
+            retry_timings["retry_last_ts"], retry_timings["retry_interval"]
         )
 
         now = int(clock.time_msec())
diff --git a/tests/federation/test_federation.py b/tests/federation/test_federation.py
index 2ecd00d2ad..a4ef60b911 100644
--- a/tests/federation/test_federation.py
+++ b/tests/federation/test_federation.py
@@ -24,8 +24,6 @@ from ..utils import MockHttpResource, MockClock, setup_test_homeserver
 from synapse.federation import initialize_http_replication
 from synapse.events import FrozenEvent
 
-from synapse.storage.transactions import DestinationsTable
-
 
 def make_pdu(prev_pdus=[], **kwargs):
     """Provide some default fields for making a PduTuple."""
@@ -57,8 +55,14 @@ class FederationTestCase(unittest.TestCase):
         self.mock_persistence.get_received_txn_response.return_value = (
             defer.succeed(None)
         )
+
+        retry_timings_res = {
+            "destination": "",
+            "retry_last_ts": 0,
+            "retry_interval": 0,
+        }
         self.mock_persistence.get_destination_retry_timings.return_value = (
-            defer.succeed(DestinationsTable.EntryType("", 0, 0))
+            defer.succeed(retry_timings_res)
         )
         self.mock_persistence.get_auth_chain.return_value = []
         self.clock = MockClock()
diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py
index c13ade3286..08d2404b6c 100644
--- a/tests/handlers/test_federation.py
+++ b/tests/handlers/test_federation.py
@@ -87,6 +87,15 @@ class FederationTestCase(unittest.TestCase):
         self.datastore.get_room.return_value = defer.succeed(True)
         self.auth.check_host_in_room.return_value = defer.succeed(True)
 
+        retry_timings_res = {
+            "destination": "",
+            "retry_last_ts": 0,
+            "retry_interval": 0,
+        }
+        self.datastore.get_destination_retry_timings.return_value = (
+            defer.succeed(retry_timings_res)
+        )
+
         def have_events(event_ids):
             return defer.succeed({})
         self.datastore.have_events.side_effect = have_events
diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py
index 04eba4289e..9b0e606918 100644
--- a/tests/handlers/test_presence.py
+++ b/tests/handlers/test_presence.py
@@ -194,8 +194,13 @@ class MockedDatastorePresenceTestCase(PresenceTestCase):
         return datastore
 
     def setUp_datastore_federation_mocks(self, datastore):
+        retry_timings_res = {
+            "destination": "",
+            "retry_last_ts": 0,
+            "retry_interval": 0,
+        }
         datastore.get_destination_retry_timings.return_value = (
-            defer.succeed(DestinationsTable.EntryType("", 0, 0))
+            defer.succeed(retry_timings_res)
         )
 
         def get_received_txn_response(*args):
diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py
index 91d4102fee..b318d4944a 100644
--- a/tests/handlers/test_typing.py
+++ b/tests/handlers/test_typing.py
@@ -96,8 +96,13 @@ class TypingNotificationsTestCase(unittest.TestCase):
         self.event_source = hs.get_event_sources().sources["typing"]
 
         self.datastore = hs.get_datastore()
+        retry_timings_res = {
+            "destination": "",
+            "retry_last_ts": 0,
+            "retry_interval": 0,
+        }
         self.datastore.get_destination_retry_timings.return_value = (
-            defer.succeed(DestinationsTable.EntryType("", 0, 0))
+            defer.succeed(retry_timings_res)
         )
 
         def get_received_txn_response(*args):
diff --git a/tests/rest/client/v1/test_events.py b/tests/rest/client/v1/test_events.py
index 36b0f2ff6d..445272e323 100644
--- a/tests/rest/client/v1/test_events.py
+++ b/tests/rest/client/v1/test_events.py
@@ -115,12 +115,6 @@ class EventStreamPermissionsTestCase(RestTestCase):
         hs = yield setup_test_homeserver(
             http_client=None,
             replication_layer=Mock(),
-            clock=Mock(spec=[
-                "call_later",
-                "cancel_call_later",
-                "time_msec",
-                "time"
-            ]),
             ratelimiter=NonCallableMock(spec_set=[
                 "send_message",
             ]),
@@ -132,9 +126,6 @@ class EventStreamPermissionsTestCase(RestTestCase):
 
         hs.get_handlers().federation_handler = Mock()
 
-        hs.get_clock().time_msec.return_value = 1000000
-        hs.get_clock().time.return_value = 1000
-
         synapse.rest.client.v1.register.register_servlets(hs, self.mock_resource)
         synapse.rest.client.v1.events.register_servlets(hs, self.mock_resource)
         synapse.rest.client.v1.room.register_servlets(hs, self.mock_resource)
diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py
index 675959c56c..77376b348e 100644
--- a/tests/storage/test_appservice.py
+++ b/tests/storage/test_appservice.py
@@ -15,6 +15,7 @@
 from tests import unittest
 from twisted.internet import defer
 
+from tests.utils import setup_test_homeserver
 from synapse.appservice import ApplicationService, ApplicationServiceState
 from synapse.server import HomeServer
 from synapse.storage.appservice import (
@@ -33,14 +34,10 @@ class ApplicationServiceStoreTestCase(unittest.TestCase):
     @defer.inlineCallbacks
     def setUp(self):
         self.as_yaml_files = []
-        db_pool = SQLiteMemoryDbPool()
-        yield db_pool.prepare()
-        hs = HomeServer(
-            "test", db_pool=db_pool, clock=MockClock(),
-            config=Mock(
-                app_service_config_files=self.as_yaml_files
-            )
+        config = Mock(
+            app_service_config_files=self.as_yaml_files
         )
+        hs = yield setup_test_homeserver(config=config)
 
         self.as_token = "token1"
         self.as_url = "some_url"
@@ -102,8 +99,13 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
     @defer.inlineCallbacks
     def setUp(self):
         self.as_yaml_files = []
-        self.db_pool = SQLiteMemoryDbPool()
-        yield self.db_pool.prepare()
+
+        config = Mock(
+            app_service_config_files=self.as_yaml_files
+        )
+        hs = yield setup_test_homeserver(config=config)
+        self.db_pool = hs.get_db_pool()
+
         self.as_list = [
             {
                 "token": "token1",
@@ -129,11 +131,8 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
         for s in self.as_list:
             yield self._add_service(s["url"], s["token"])
 
-        hs = HomeServer(
-            "test", db_pool=self.db_pool, clock=MockClock(), config=Mock(
-                app_service_config_files=self.as_yaml_files
-            )
-        )
+        self.as_yaml_files = []
+
         self.store = TestTransactionStore(hs)
 
     def _add_service(self, url, as_token):
@@ -302,7 +301,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
             (service.id,)
         )
         self.assertEquals(1, len(res))
-        self.assertEquals(str(txn_id), res[0][0])
+        self.assertEquals(txn_id, res[0][0])
 
         res = yield self.db_pool.runQuery(
             "SELECT * FROM application_services_txns WHERE txn_id=?",
@@ -325,7 +324,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
             (service.id,)
         )
         self.assertEquals(1, len(res))
-        self.assertEquals(str(txn_id), res[0][0])
+        self.assertEquals(txn_id, res[0][0])
         self.assertEquals(ApplicationServiceState.UP, res[0][1])
 
         res = yield self.db_pool.runQuery(
diff --git a/tests/storage/test_base.py b/tests/storage/test_base.py
index 7f5845cf0c..a64d2b821e 100644
--- a/tests/storage/test_base.py
+++ b/tests/storage/test_base.py
@@ -24,6 +24,7 @@ from collections import OrderedDict
 from synapse.server import HomeServer
 
 from synapse.storage._base import SQLBaseStore
+from synapse.storage.engines import create_engine
 
 
 class SQLBaseStoreTestCase(unittest.TestCase):
@@ -32,15 +33,26 @@ class SQLBaseStoreTestCase(unittest.TestCase):
     def setUp(self):
         self.db_pool = Mock(spec=["runInteraction"])
         self.mock_txn = Mock()
+        self.mock_conn = Mock(spec_set=["cursor"])
+        self.mock_conn.cursor.return_value = self.mock_txn
         # Our fake runInteraction just runs synchronously inline
 
         def runInteraction(func, *args, **kwargs):
             return defer.succeed(func(self.mock_txn, *args, **kwargs))
         self.db_pool.runInteraction = runInteraction
 
+        def runWithConnection(func, *args, **kwargs):
+            return defer.succeed(func(self.mock_conn, *args, **kwargs))
+        self.db_pool.runWithConnection = runWithConnection
+
         config = Mock()
         config.event_cache_size = 1
-        hs = HomeServer("test", db_pool=self.db_pool, config=config)
+        hs = HomeServer(
+            "test",
+            db_pool=self.db_pool,
+            config=config,
+            database_engine=create_engine("sqlite3"),
+        )
 
         self.datastore = SQLBaseStore(hs)
 
@@ -86,8 +98,7 @@ class SQLBaseStoreTestCase(unittest.TestCase):
 
         self.assertEquals("Value", value)
         self.mock_txn.execute.assert_called_with(
-                "SELECT retcol FROM tablename WHERE keycol = ? "
-                "ORDER BY rowid asc",
+                "SELECT retcol FROM tablename WHERE keycol = ?",
                 ["TheKey"]
         )
 
@@ -104,8 +115,7 @@ class SQLBaseStoreTestCase(unittest.TestCase):
 
         self.assertEquals({"colA": 1, "colB": 2, "colC": 3}, ret)
         self.mock_txn.execute.assert_called_with(
-                "SELECT colA, colB, colC FROM tablename WHERE keycol = ? "
-                "ORDER BY rowid asc",
+                "SELECT colA, colB, colC FROM tablename WHERE keycol = ?",
                 ["TheKey"]
         )
 
@@ -139,8 +149,7 @@ class SQLBaseStoreTestCase(unittest.TestCase):
 
         self.assertEquals([{"colA": 1}, {"colA": 2}, {"colA": 3}], ret)
         self.mock_txn.execute.assert_called_with(
-                "SELECT colA FROM tablename WHERE keycol = ? "
-                "ORDER BY rowid asc",
+                "SELECT colA FROM tablename WHERE keycol = ?",
                 ["A set"]
         )
 
@@ -189,8 +198,7 @@ class SQLBaseStoreTestCase(unittest.TestCase):
 
         self.assertEquals({"columname": "Old Value"}, ret)
         self.mock_txn.execute.assert_has_calls([
-                call('SELECT columname FROM tablename WHERE keycol = ? '
-                     'ORDER BY rowid asc',
+                call('SELECT columname FROM tablename WHERE keycol = ?',
                     ['TheKey']),
                 call("UPDATE tablename SET columname = ? WHERE keycol = ?",
                     ["New Value", "TheKey"])
diff --git a/tests/storage/test_registration.py b/tests/storage/test_registration.py
index e0b81f2b57..78f6004204 100644
--- a/tests/storage/test_registration.py
+++ b/tests/storage/test_registration.py
@@ -42,28 +42,38 @@ class RegistrationStoreTestCase(unittest.TestCase):
         self.assertEquals(
             # TODO(paul): Surely this field should be 'user_id', not 'name'
             #  Additionally surely it shouldn't come in a 1-element list
-            [{"name": self.user_id, "password_hash": self.pwhash}],
+            {"name": self.user_id, "password_hash": self.pwhash},
             (yield self.store.get_user_by_id(self.user_id))
         )
 
-        self.assertEquals(
-            {"admin": 0,
-             "device_id": None,
-             "name": self.user_id,
-             "token_id": 1},
-            (yield self.store.get_user_by_token(self.tokens[0]))
+        result = yield self.store.get_user_by_token(self.tokens[1])
+
+        self.assertDictContainsSubset(
+            {
+                "admin": 0,
+                 "device_id": None,
+                 "name": self.user_id,
+            },
+            result
         )
 
+        self.assertTrue("token_id" in result)
+
     @defer.inlineCallbacks
     def test_add_tokens(self):
         yield self.store.register(self.user_id, self.tokens[0], self.pwhash)
         yield self.store.add_access_token_to_user(self.user_id, self.tokens[1])
 
-        self.assertEquals(
-            {"admin": 0,
-             "device_id": None,
-             "name": self.user_id,
-             "token_id": 2},
-            (yield self.store.get_user_by_token(self.tokens[1]))
+        result = yield self.store.get_user_by_token(self.tokens[1])
+
+        self.assertDictContainsSubset(
+            {
+                "admin": 0,
+                 "device_id": None,
+                 "name": self.user_id,
+            },
+            result
         )
 
+        self.assertTrue("token_id" in result)
+
diff --git a/tests/storage/test_roommember.py b/tests/storage/test_roommember.py
index 811fea544b..785953cc89 100644
--- a/tests/storage/test_roommember.py
+++ b/tests/storage/test_roommember.py
@@ -119,7 +119,7 @@ class RoomMemberStoreTestCase(unittest.TestCase):
         yield self.inject_room_member(self.room, self.u_alice, Membership.JOIN)
 
         self.assertEquals(
-            ["test"],
+            {"test"},
             (yield self.store.get_joined_hosts_for_room(self.room.to_string()))
         )
 
@@ -127,7 +127,7 @@ class RoomMemberStoreTestCase(unittest.TestCase):
         yield self.inject_room_member(self.room, self.u_bob, Membership.JOIN)
 
         self.assertEquals(
-            ["test"],
+            {"test"},
             (yield self.store.get_joined_hosts_for_room(self.room.to_string()))
         )
 
@@ -136,9 +136,9 @@ class RoomMemberStoreTestCase(unittest.TestCase):
 
         self.assertEquals(
             {"test", "elsewhere"},
-            set((yield
+            (yield
                 self.store.get_joined_hosts_for_room(self.room.to_string())
-            ))
+            )
         )
 
         # Should still have both hosts
@@ -146,15 +146,15 @@ class RoomMemberStoreTestCase(unittest.TestCase):
 
         self.assertEquals(
             {"test", "elsewhere"},
-            set((yield
+            (yield
                 self.store.get_joined_hosts_for_room(self.room.to_string())
-            ))
+            )
         )
 
         # Should have only one host after other leaves
         yield self.inject_room_member(self.room, self.u_charlie, Membership.LEAVE)
 
         self.assertEquals(
-            ["test"],
+            {"test"},
             (yield self.store.get_joined_hosts_for_room(self.room.to_string()))
         )
diff --git a/tests/utils.py b/tests/utils.py
index 81e82a80df..cc038fecf1 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -17,6 +17,7 @@ from synapse.http.server import HttpServer
 from synapse.api.errors import cs_error, CodeMessageException, StoreError
 from synapse.api.constants import EventTypes
 from synapse.storage import prepare_database
+from synapse.storage.engines import create_engine
 from synapse.server import HomeServer
 
 from synapse.util.logcontext import LoggingContext
@@ -44,18 +45,23 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs):
         config.event_cache_size = 1
         config.disable_registration = False
 
+    if "clock" not in kargs:
+        kargs["clock"] = MockClock()
+
     if datastore is None:
         db_pool = SQLiteMemoryDbPool()
         yield db_pool.prepare()
         hs = HomeServer(
             name, db_pool=db_pool, config=config,
             version_string="Synapse/tests",
+            database_engine=create_engine("sqlite3"),
             **kargs
         )
     else:
         hs = HomeServer(
             name, db_pool=None, datastore=datastore, config=config,
             version_string="Synapse/tests",
+            database_engine=create_engine("sqlite3"),
             **kargs
         )
 
@@ -227,7 +233,10 @@ class SQLiteMemoryDbPool(ConnectionPool, object):
         )
 
     def prepare(self):
-        return self.runWithConnection(prepare_database)
+        engine = create_engine("sqlite3")
+        return self.runWithConnection(
+            lambda conn: prepare_database(conn, engine)
+        )
 
 
 class MemoryDataStore(object):