summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorMark Haines <mark.haines@matrix.org>2015-04-29 13:15:14 +0100
committerMark Haines <mark.haines@matrix.org>2015-04-29 13:15:14 +0100
commit4ad8b451559682645c818f6c1180e5f9d42a7eeb (patch)
tree94fe0beaaaa4ed8c1bb9451f1274df819d9ce8d7 /synapse/storage
parentUpdate the query format used by keyring to match current key v2 spec (diff)
parentMention that postgres databases must have the correct charset encoding (diff)
downloadsynapse-4ad8b451559682645c818f6c1180e5f9d42a7eeb.tar.xz
Merge branch 'develop' into key_distribution
Conflicts:
	synapse/config/homeserver.py
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py146
-rw-r--r--synapse/storage/_base.py164
-rw-r--r--synapse/storage/appservice.py4
-rw-r--r--synapse/storage/directory.py8
-rw-r--r--synapse/storage/engines/__init__.py37
-rw-r--r--synapse/storage/engines/postgres.py47
-rw-r--r--synapse/storage/engines/sqlite3.py43
-rw-r--r--synapse/storage/event_federation.py35
-rw-r--r--synapse/storage/events.py160
-rw-r--r--synapse/storage/keys.py16
-rw-r--r--synapse/storage/presence.py1
-rw-r--r--synapse/storage/push_rule.py4
-rw-r--r--synapse/storage/pusher.py117
-rw-r--r--synapse/storage/registration.py119
-rw-r--r--synapse/storage/room.py39
-rw-r--r--synapse/storage/roommember.py148
-rw-r--r--synapse/storage/schema/delta/12/v12.sql34
-rw-r--r--synapse/storage/schema/delta/13/v13.sql7
-rw-r--r--synapse/storage/schema/delta/15/appservice_txns.sql11
-rw-r--r--synapse/storage/schema/delta/15/presence_indices.sql2
-rw-r--r--synapse/storage/schema/delta/15/v15.sql25
-rw-r--r--synapse/storage/schema/delta/16/events_order_index.sql4
-rw-r--r--synapse/storage/schema/delta/16/remote_media_cache_index.sql2
-rw-r--r--synapse/storage/schema/delta/16/remove_duplicates.sql9
-rw-r--r--synapse/storage/schema/delta/16/room_alias_index.sql3
-rw-r--r--synapse/storage/schema/delta/16/unique_constraints.sql80
-rw-r--r--synapse/storage/schema/delta/16/users.sql56
-rw-r--r--synapse/storage/schema/full_schemas/11/event_edges.sql42
-rw-r--r--synapse/storage/schema/full_schemas/11/event_signatures.sql34
-rw-r--r--synapse/storage/schema/full_schemas/11/im.sql62
-rw-r--r--synapse/storage/schema/full_schemas/11/keys.sql12
-rw-r--r--synapse/storage/schema/full_schemas/11/media_repository.sql21
-rw-r--r--synapse/storage/schema/full_schemas/11/presence.sql19
-rw-r--r--synapse/storage/schema/full_schemas/11/profiles.sql5
-rw-r--r--synapse/storage/schema/full_schemas/11/redactions.sql6
-rw-r--r--synapse/storage/schema/full_schemas/11/room_aliases.sql3
-rw-r--r--synapse/storage/schema/full_schemas/11/state.sql17
-rw-r--r--synapse/storage/schema/full_schemas/11/transactions.sql33
-rw-r--r--synapse/storage/schema/full_schemas/11/users.sql20
-rw-r--r--synapse/storage/schema/full_schemas/16/application_services.sql48
-rw-r--r--synapse/storage/schema/full_schemas/16/event_edges.sql89
-rw-r--r--synapse/storage/schema/full_schemas/16/event_signatures.sql55
-rw-r--r--synapse/storage/schema/full_schemas/16/im.sql128
-rw-r--r--synapse/storage/schema/full_schemas/16/keys.sql31
-rw-r--r--synapse/storage/schema/full_schemas/16/media_repository.sql68
-rw-r--r--synapse/storage/schema/full_schemas/16/presence.sql40
-rw-r--r--synapse/storage/schema/full_schemas/16/profiles.sql20
-rw-r--r--synapse/storage/schema/full_schemas/16/push.sql73
-rw-r--r--synapse/storage/schema/full_schemas/16/redactions.sql22
-rw-r--r--synapse/storage/schema/full_schemas/16/room_aliases.sql29
-rw-r--r--synapse/storage/schema/full_schemas/16/state.sql40
-rw-r--r--synapse/storage/schema/full_schemas/16/transactions.sql63
-rw-r--r--synapse/storage/schema/full_schemas/16/users.sql42
-rw-r--r--synapse/storage/schema/schema_version.sql9
-rw-r--r--synapse/storage/signatures.py6
-rw-r--r--synapse/storage/state.py13
-rw-r--r--synapse/storage/stream.py51
-rw-r--r--synapse/storage/transactions.py213
-rw-r--r--synapse/storage/util/__init__.py14
-rw-r--r--synapse/storage/util/id_generators.py131
60 files changed, 2046 insertions, 734 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index f4dec70393..61215bbc7b 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -51,7 +51,7 @@ logger = logging.getLogger(__name__)
 
 # Remember to update this number every time a change is made to database
 # schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 15
+SCHEMA_VERSION = 16
 
 dir_path = os.path.abspath(os.path.dirname(__file__))
 
@@ -104,14 +104,16 @@ class DataStore(RoomMemberStore, RoomStore,
 
         self.client_ip_last_seen.prefill(*key + (now,))
 
-        yield self._simple_insert(
+        yield self._simple_upsert(
             "user_ips",
-            {
-                "user": user.to_string(),
+            keyvalues={
+                "user_id": user.to_string(),
                 "access_token": access_token,
-                "device_id": device_id,
                 "ip": ip,
                 "user_agent": user_agent,
+            },
+            values={
+                "device_id": device_id,
                 "last_seen": now,
             },
             desc="insert_client_ip",
@@ -120,7 +122,7 @@ class DataStore(RoomMemberStore, RoomStore,
     def get_user_ip_and_agents(self, user):
         return self._simple_select_list(
             table="user_ips",
-            keyvalues={"user": user.to_string()},
+            keyvalues={"user_id": user.to_string()},
             retcols=[
                 "device_id", "access_token", "ip", "user_agent", "last_seen"
             ],
@@ -148,21 +150,23 @@ class UpgradeDatabaseException(PrepareDatabaseException):
     pass
 
 
-def prepare_database(db_conn):
+def prepare_database(db_conn, database_engine):
     """Prepares a database for usage. Will either create all necessary tables
     or upgrade from an older schema version.
     """
     try:
         cur = db_conn.cursor()
-        version_info = _get_or_create_schema_state(cur)
+        version_info = _get_or_create_schema_state(cur, database_engine)
 
         if version_info:
             user_version, delta_files, upgraded = version_info
-            _upgrade_existing_database(cur, user_version, delta_files, upgraded)
+            _upgrade_existing_database(
+                cur, user_version, delta_files, upgraded, database_engine
+            )
         else:
-            _setup_new_database(cur)
+            _setup_new_database(cur, database_engine)
 
-        cur.execute("PRAGMA user_version = %d" % (SCHEMA_VERSION,))
+        # cur.execute("PRAGMA user_version = %d" % (SCHEMA_VERSION,))
 
         cur.close()
         db_conn.commit()
@@ -171,7 +175,7 @@ def prepare_database(db_conn):
         raise
 
 
-def _setup_new_database(cur):
+def _setup_new_database(cur, database_engine):
     """Sets up the database by finding a base set of "full schemas" and then
     applying any necessary deltas.
 
@@ -225,31 +229,30 @@ def _setup_new_database(cur):
 
     directory_entries = os.listdir(sql_dir)
 
-    sql_script = "BEGIN TRANSACTION;\n"
     for filename in fnmatch.filter(directory_entries, "*.sql"):
         sql_loc = os.path.join(sql_dir, filename)
         logger.debug("Applying schema %s", sql_loc)
-        sql_script += read_schema(sql_loc)
-        sql_script += "\n"
-    sql_script += "COMMIT TRANSACTION;"
-    cur.executescript(sql_script)
+        executescript(cur, sql_loc)
 
     cur.execute(
-        "INSERT OR REPLACE INTO schema_version (version, upgraded)"
-        " VALUES (?,?)",
-        (max_current_ver, False)
+        database_engine.convert_param_style(
+            "INSERT INTO schema_version (version, upgraded)"
+            " VALUES (?,?)"
+        ),
+        (max_current_ver, False,)
     )
 
     _upgrade_existing_database(
         cur,
         current_version=max_current_ver,
         applied_delta_files=[],
-        upgraded=False
+        upgraded=False,
+        database_engine=database_engine,
     )
 
 
 def _upgrade_existing_database(cur, current_version, applied_delta_files,
-                               upgraded):
+                               upgraded, database_engine):
     """Upgrades an existing database.
 
     Delta files can either be SQL stored in *.sql files, or python modules
@@ -305,6 +308,8 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files,
     if not upgraded:
         start_ver += 1
 
+    logger.debug("applied_delta_files: %s", applied_delta_files)
+
     for v in range(start_ver, SCHEMA_VERSION + 1):
         logger.debug("Upgrading schema to v%d", v)
 
@@ -321,6 +326,7 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files,
         directory_entries.sort()
         for file_name in directory_entries:
             relative_path = os.path.join(str(v), file_name)
+            logger.debug("Found file: %s", relative_path)
             if relative_path in applied_delta_files:
                 continue
 
@@ -342,9 +348,8 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files,
                 module.run_upgrade(cur)
             elif ext == ".sql":
                 # A plain old .sql file, just read and execute it
-                delta_schema = read_schema(absolute_path)
                 logger.debug("Applying schema %s", relative_path)
-                cur.executescript(delta_schema)
+                executescript(cur, absolute_path)
             else:
                 # Not a valid delta file.
                 logger.warn(
@@ -356,24 +361,82 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files,
 
             # Mark as done.
             cur.execute(
-                "INSERT INTO applied_schema_deltas (version, file)"
-                " VALUES (?,?)",
+                database_engine.convert_param_style(
+                    "INSERT INTO applied_schema_deltas (version, file)"
+                    " VALUES (?,?)",
+                ),
                 (v, relative_path)
             )
 
             cur.execute(
-                "INSERT OR REPLACE INTO schema_version (version, upgraded)"
-                " VALUES (?,?)",
+                database_engine.convert_param_style(
+                    "REPLACE INTO schema_version (version, upgraded)"
+                    " VALUES (?,?)",
+                ),
                 (v, True)
             )
 
 
-def _get_or_create_schema_state(txn):
+def get_statements(f):
+    statement_buffer = ""
+    in_comment = False  # If we're in a /* ... */ style comment
+
+    for line in f:
+        line = line.strip()
+
+        if in_comment:
+            # Check if this line contains an end to the comment
+            comments = line.split("*/", 1)
+            if len(comments) == 1:
+                continue
+            line = comments[1]
+            in_comment = False
+
+        # Remove inline block comments
+        line = re.sub(r"/\*.*\*/", " ", line)
+
+        # Does this line start a comment?
+        comments = line.split("/*", 1)
+        if len(comments) > 1:
+            line = comments[0]
+            in_comment = True
+
+        # Deal with line comments
+        line = line.split("--", 1)[0]
+        line = line.split("//", 1)[0]
+
+        # Find *all* semicolons. We need to treat first and last entry
+        # specially.
+        statements = line.split(";")
+
+        # We must prepend statement_buffer to the first statement
+        first_statement = "%s %s" % (
+            statement_buffer.strip(),
+            statements[0].strip()
+        )
+        statements[0] = first_statement
+
+        # Every entry, except the last, is a full statement
+        for statement in statements[:-1]:
+            yield statement.strip()
+
+        # The last entry did *not* end in a semicolon, so we store it for the
+        # next semicolon we find
+        statement_buffer = statements[-1].strip()
+
+
+def executescript(txn, schema_path):
+    with open(schema_path, 'r') as f:
+        for statement in get_statements(f):
+            txn.execute(statement)
+
+
+def _get_or_create_schema_state(txn, database_engine):
+    # Bluntly try creating the schema_version tables.
     schema_path = os.path.join(
         dir_path, "schema", "schema_version.sql",
     )
-    create_schema = read_schema(schema_path)
-    txn.executescript(create_schema)
+    executescript(txn, schema_path)
 
     txn.execute("SELECT version, upgraded FROM schema_version")
     row = txn.fetchone()
@@ -382,10 +445,13 @@ def _get_or_create_schema_state(txn):
 
     if current_version:
         txn.execute(
-            "SELECT file FROM applied_schema_deltas WHERE version >= ?",
+            database_engine.convert_param_style(
+                "SELECT file FROM applied_schema_deltas WHERE version >= ?"
+            ),
             (current_version,)
         )
-        return current_version, txn.fetchall(), upgraded
+        applied_deltas = [d for d, in txn.fetchall()]
+        return current_version, applied_deltas, upgraded
 
     return None
 
@@ -417,7 +483,19 @@ def prepare_sqlite3_database(db_conn):
 
             if row and row[0]:
                 db_conn.execute(
-                    "INSERT OR REPLACE INTO schema_version (version, upgraded)"
+                    "REPLACE INTO schema_version (version, upgraded)"
                     " VALUES (?,?)",
                     (row[0], False)
                 )
+
+
+def are_all_users_on_domain(txn, database_engine, domain):
+    sql = database_engine.convert_param_style(
+        "SELECT COUNT(*) FROM users WHERE name NOT LIKE ?"
+    )
+    pat = "%:" + domain
+    txn.execute(sql, (pat,))
+    num_not_matching = txn.fetchall()[0][0]
+    if num_not_matching == 0:
+        return True
+    return False
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index e3e67d8e0d..6017c2a6e8 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -22,6 +22,8 @@ from synapse.util.logcontext import PreserveLoggingContext, LoggingContext
 from synapse.util.lrucache import LruCache
 import synapse.metrics
 
+from util.id_generators import IdGenerator, StreamIdGenerator
+
 from twisted.internet import defer
 
 from collections import namedtuple, OrderedDict
@@ -145,11 +147,12 @@ class LoggingTransaction(object):
     """An object that almost-transparently proxies for the 'txn' object
     passed to the constructor. Adds logging and metrics to the .execute()
     method."""
-    __slots__ = ["txn", "name"]
+    __slots__ = ["txn", "name", "database_engine"]
 
-    def __init__(self, txn, name):
+    def __init__(self, txn, name, database_engine):
         object.__setattr__(self, "txn", txn)
         object.__setattr__(self, "name", name)
+        object.__setattr__(self, "database_engine", database_engine)
 
     def __getattr__(self, name):
         return getattr(self.txn, name)
@@ -161,26 +164,32 @@ class LoggingTransaction(object):
         # TODO(paul): Maybe use 'info' and 'debug' for values?
         sql_logger.debug("[SQL] {%s} %s", self.name, sql)
 
-        try:
-            if args and args[0]:
-                values = args[0]
+        sql = self.database_engine.convert_param_style(sql)
+
+        if args and args[0]:
+            args = list(args)
+            args[0] = [
+                self.database_engine.encode_parameter(a) for a in args[0]
+            ]
+            try:
                 sql_logger.debug(
-                    "[SQL values] {%s} " + ", ".join(("<%r>",) * len(values)),
+                    "[SQL values] {%s} " + ", ".join(("<%r>",) * len(args[0])),
                     self.name,
-                    *values
+                    *args[0]
                 )
-        except:
-            # Don't let logging failures stop SQL from working
-            pass
+            except:
+                # Don't let logging failures stop SQL from working
+                pass
 
         start = time.time() * 1000
+
         try:
             return self.txn.execute(
                 sql, *args, **kwargs
             )
-        except:
-                logger.exception("[SQL FAIL] {%s}", self.name)
-                raise
+        except Exception as e:
+            logger.debug("[SQL FAIL] {%s} %s", self.name, e)
+            raise
         finally:
             msecs = (time.time() * 1000) - start
             sql_logger.debug("[SQL time] {%s} %f", self.name, msecs)
@@ -245,6 +254,14 @@ class SQLBaseStore(object):
         self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True,
                                       max_entries=hs.config.event_cache_size)
 
+        self.database_engine = hs.database_engine
+
+        self._stream_id_gen = StreamIdGenerator()
+        self._transaction_id_gen = IdGenerator("sent_transactions", "id", self)
+        self._state_groups_id_gen = IdGenerator("state_groups", "id", self)
+        self._access_tokens_id_gen = IdGenerator("access_tokens", "id", self)
+        self._pushers_id_gen = IdGenerator("pushers", "id", self)
+
     def start_profiling(self):
         self._previous_loop_ts = self._clock.time_msec()
 
@@ -281,8 +298,11 @@ class SQLBaseStore(object):
 
         start_time = time.time() * 1000
 
-        def inner_func(txn, *args, **kwargs):
+        def inner_func(conn, *args, **kwargs):
             with LoggingContext("runInteraction") as context:
+                if self.database_engine.is_connection_closed(conn):
+                    conn.reconnect()
+
                 current_context.copy_to(context)
                 start = time.time() * 1000
                 txn_id = self._TXN_ID
@@ -296,9 +316,48 @@ class SQLBaseStore(object):
                 sql_scheduling_timer.inc_by(time.time() * 1000 - start_time)
                 transaction_logger.debug("[TXN START] {%s}", name)
                 try:
-                    return func(LoggingTransaction(txn, name), *args, **kwargs)
-                except:
-                    logger.exception("[TXN FAIL] {%s}", name)
+                    i = 0
+                    N = 5
+                    while True:
+                        try:
+                            txn = conn.cursor()
+                            return func(
+                                LoggingTransaction(txn, name, self.database_engine),
+                                *args, **kwargs
+                            )
+                        except self.database_engine.module.OperationalError as e:
+                            # This can happen if the database disappears mid
+                            # transaction.
+                            logger.warn(
+                                "[TXN OPERROR] {%s} %s %d/%d",
+                                name, e, i, N
+                            )
+                            if i < N:
+                                i += 1
+                                try:
+                                    conn.rollback()
+                                except self.database_engine.module.Error as e1:
+                                    logger.warn(
+                                        "[TXN EROLL] {%s} %s",
+                                        name, e1,
+                                    )
+                                continue
+                        except self.database_engine.module.DatabaseError as e:
+                            if self.database_engine.is_deadlock(e):
+                                logger.warn("[TXN DEADLOCK] {%s} %d/%d", name, i, N)
+                                if i < N:
+                                    i += 1
+                                    try:
+                                        conn.rollback()
+                                    except self.database_engine.module.Error as e1:
+                                        logger.warn(
+                                            "[TXN EROLL] {%s} %s",
+                                            name, e1,
+                                        )
+                                    continue
+                            raise
+                except Exception as e:
+                    logger.debug("[TXN FAIL] {%s} %s", name, e)
                     raise
                 finally:
                     end = time.time() * 1000
@@ -311,7 +370,7 @@ class SQLBaseStore(object):
                     sql_txn_timer.inc_by(duration, desc)
 
         with PreserveLoggingContext():
-            result = yield self._db_pool.runInteraction(
+            result = yield self._db_pool.runWithConnection(
                 inner_func, *args, **kwargs
             )
         defer.returnValue(result)
@@ -342,11 +401,11 @@ class SQLBaseStore(object):
             The result of decoder(results)
         """
         def interaction(txn):
-            cursor = txn.execute(query, args)
+            txn.execute(query, args)
             if decoder:
-                return decoder(cursor)
+                return decoder(txn)
             else:
-                return cursor.fetchall()
+                return txn.fetchall()
 
         return self.runInteraction(desc, interaction)
 
@@ -356,27 +415,29 @@ class SQLBaseStore(object):
     # "Simple" SQL API methods that operate on a single table with no JOINs,
     # no complex WHERE clauses, just a dict of values for columns.
 
-    def _simple_insert(self, table, values, or_replace=False, or_ignore=False,
+    @defer.inlineCallbacks
+    def _simple_insert(self, table, values, or_ignore=False,
                        desc="_simple_insert"):
         """Executes an INSERT query on the named table.
 
         Args:
             table : string giving the table name
             values : dict of new column names and values for them
-            or_replace : bool; if True performs an INSERT OR REPLACE
         """
-        return self.runInteraction(
-            desc,
-            self._simple_insert_txn, table, values, or_replace=or_replace,
-            or_ignore=or_ignore,
-        )
+        try:
+            yield self.runInteraction(
+                desc,
+                self._simple_insert_txn, table, values,
+            )
+        except self.database_engine.module.IntegrityError:
+            # We have to do or_ignore flag at this layer, since we can't reuse
+            # a cursor after we receive an error from the db.
+            if not or_ignore:
+                raise
 
     @log_function
-    def _simple_insert_txn(self, txn, table, values, or_replace=False,
-                           or_ignore=False):
-        sql = "%s INTO %s (%s) VALUES(%s)" % (
-            ("INSERT OR REPLACE" if or_replace else
-             "INSERT OR IGNORE" if or_ignore else "INSERT"),
+    def _simple_insert_txn(self, txn, table, values):
+        sql = "INSERT INTO %s (%s) VALUES(%s)" % (
             table,
             ", ".join(k for k in values),
             ", ".join("?" for k in values)
@@ -388,22 +449,26 @@ class SQLBaseStore(object):
         )
 
         txn.execute(sql, values.values())
-        return txn.lastrowid
 
-    def _simple_upsert(self, table, keyvalues, values, desc="_simple_upsert"):
+    def _simple_upsert(self, table, keyvalues, values,
+                       insertion_values={}, desc="_simple_upsert"):
         """
         Args:
             table (str): The table to upsert into
             keyvalues (dict): The unique key tables and their new values
             values (dict): The nonunique columns and their new values
+            insertion_values (dict): key/values to use when inserting
         Returns: A deferred
         """
         return self.runInteraction(
             desc,
-            self._simple_upsert_txn, table, keyvalues, values
+            self._simple_upsert_txn, table, keyvalues, values, insertion_values,
         )
 
-    def _simple_upsert_txn(self, txn, table, keyvalues, values):
+    def _simple_upsert_txn(self, txn, table, keyvalues, values, insertion_values={}):
+        # We need to lock the table :(
+        self.database_engine.lock_table(txn, table)
+
         # Try to update
         sql = "UPDATE %s SET %s WHERE %s" % (
             table,
@@ -422,6 +487,7 @@ class SQLBaseStore(object):
             allvalues = {}
             allvalues.update(keyvalues)
             allvalues.update(values)
+            allvalues.update(insertion_values)
 
             sql = "INSERT INTO %s (%s) VALUES (%s)" % (
                 table,
@@ -489,8 +555,7 @@ class SQLBaseStore(object):
 
     def _simple_select_onecol_txn(self, txn, table, keyvalues, retcol):
         sql = (
-            "SELECT %(retcol)s FROM %(table)s WHERE %(where)s "
-            "ORDER BY rowid asc"
+            "SELECT %(retcol)s FROM %(table)s WHERE %(where)s"
         ) % {
             "retcol": retcol,
             "table": table,
@@ -548,14 +613,14 @@ class SQLBaseStore(object):
             retcols : list of strings giving the names of the columns to return
         """
         if keyvalues:
-            sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % (
+            sql = "SELECT %s FROM %s WHERE %s" % (
                 ", ".join(retcols),
                 table,
                 " AND ".join("%s = ?" % (k, ) for k in keyvalues)
             )
             txn.execute(sql, keyvalues.values())
         else:
-            sql = "SELECT %s FROM %s ORDER BY rowid asc" % (
+            sql = "SELECT %s FROM %s" % (
                 ", ".join(retcols),
                 table
             )
@@ -607,10 +672,10 @@ class SQLBaseStore(object):
 
     def _simple_select_one_txn(self, txn, table, keyvalues, retcols,
                                allow_none=False):
-        select_sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % (
+        select_sql = "SELECT %s FROM %s WHERE %s" % (
             ", ".join(retcols),
             table,
-            " AND ".join("%s = ?" % (k) for k in keyvalues)
+            " AND ".join("%s = ?" % (k,) for k in keyvalues)
         )
 
         txn.execute(select_sql, keyvalues.values())
@@ -648,6 +713,11 @@ class SQLBaseStore(object):
                     updatevalues=updatevalues,
                 )
 
+                # if txn.rowcount == 0:
+                #     raise StoreError(404, "No row found")
+                if txn.rowcount > 1:
+                    raise StoreError(500, "More than one row matched")
+
             return ret
         return self.runInteraction(desc, func)
 
@@ -860,6 +930,12 @@ class SQLBaseStore(object):
         result = txn.fetchone()
         return result[0] if result else None
 
+    def get_next_stream_id(self):
+        with self._next_stream_id_lock:
+            i = self._next_stream_id
+            self._next_stream_id += 1
+            return i
+
 
 class _RollbackButIsFineException(Exception):
     """ This exception is used to rollback a transaction without implying
@@ -883,7 +959,7 @@ class Table(object):
 
     _select_where_clause = "SELECT %s FROM %s WHERE %s"
     _select_clause = "SELECT %s FROM %s"
-    _insert_clause = "INSERT OR REPLACE INTO %s (%s) VALUES (%s)"
+    _insert_clause = "REPLACE INTO %s (%s) VALUES (%s)"
 
     @classmethod
     def select_statement(cls, where_clause=None):
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index f8cbb3f323..63d1af4e86 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -366,11 +366,11 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
         new_txn_id = max(highest_txn_id, last_txn_id) + 1
 
         # Insert new txn into txn table
-        event_ids = [e.event_id for e in events]
+        event_ids = json.dumps([e.event_id for e in events])
         txn.execute(
             "INSERT INTO application_services_txns(as_id, txn_id, event_ids) "
             "VALUES(?,?,?)",
-            (service.id, new_txn_id, json.dumps(event_ids))
+            (service.id, new_txn_id, event_ids)
         )
         return AppServiceTransaction(
             service=service, id=new_txn_id, events=events
diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py
index 0199539fea..2b2bdf8615 100644
--- a/synapse/storage/directory.py
+++ b/synapse/storage/directory.py
@@ -21,8 +21,6 @@ from twisted.internet import defer
 
 from collections import namedtuple
 
-import sqlite3
-
 
 RoomAliasMapping = namedtuple(
     "RoomAliasMapping",
@@ -91,7 +89,7 @@ class DirectoryStore(SQLBaseStore):
                 },
                 desc="create_room_alias_association",
             )
-        except sqlite3.IntegrityError:
+        except self.database_engine.module.IntegrityError:
             raise SynapseError(
                 409, "Room alias %s already exists" % room_alias.to_string()
             )
@@ -120,12 +118,12 @@ class DirectoryStore(SQLBaseStore):
         defer.returnValue(room_id)
 
     def _delete_room_alias_txn(self, txn, room_alias):
-        cursor = txn.execute(
+        txn.execute(
             "SELECT room_id FROM room_aliases WHERE room_alias = ?",
             (room_alias.to_string(),)
         )
 
-        res = cursor.fetchone()
+        res = txn.fetchone()
         if res:
             room_id = res[0]
         else:
diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py
new file mode 100644
index 0000000000..eb76df7f01
--- /dev/null
+++ b/synapse/storage/engines/__init__.py
@@ -0,0 +1,37 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from .postgres import PostgresEngine
+from .sqlite3 import Sqlite3Engine
+
+import importlib
+
+
+SUPPORTED_MODULE = {
+    "sqlite3": Sqlite3Engine,
+    "psycopg2": PostgresEngine,
+}
+
+
+def create_engine(name):
+    engine_class = SUPPORTED_MODULE.get(name, None)
+
+    if engine_class:
+        module = importlib.import_module(name)
+        return engine_class(module)
+
+    raise RuntimeError(
+        "Unsupported database engine '%s'" % (name,)
+    )
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
new file mode 100644
index 0000000000..b8cca9b187
--- /dev/null
+++ b/synapse/storage/engines/postgres.py
@@ -0,0 +1,47 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.storage import prepare_database
+
+
+class PostgresEngine(object):
+    def __init__(self, database_module):
+        self.module = database_module
+        self.module.extensions.register_type(self.module.extensions.UNICODE)
+
+    def convert_param_style(self, sql):
+        return sql.replace("?", "%s")
+
+    def encode_parameter(self, param):
+        return param
+
+    def on_new_connection(self, db_conn):
+        db_conn.set_isolation_level(
+            self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ
+        )
+
+    def prepare_database(self, db_conn):
+        prepare_database(db_conn, self)
+
+    def is_deadlock(self, error):
+        if isinstance(error, self.module.DatabaseError):
+            return error.pgcode in ["40001", "40P01"]
+        return False
+
+    def is_connection_closed(self, conn):
+        return bool(conn)
+
+    def lock_table(self, txn, table):
+        txn.execute("LOCK TABLE %s in EXCLUSIVE MODE" % (table,))
diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite3.py
new file mode 100644
index 0000000000..f62d5d1205
--- /dev/null
+++ b/synapse/storage/engines/sqlite3.py
@@ -0,0 +1,43 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.storage import prepare_database, prepare_sqlite3_database
+
+
+class Sqlite3Engine(object):
+    def __init__(self, database_module):
+        self.module = database_module
+
+    def convert_param_style(self, sql):
+        return sql
+
+    def encode_parameter(self, param):
+        return param
+
+    def on_new_connection(self, db_conn):
+        self.prepare_database(db_conn)
+
+    def prepare_database(self, db_conn):
+        prepare_sqlite3_database(db_conn)
+        prepare_database(db_conn, self)
+
+    def is_deadlock(self, error):
+        return False
+
+    def is_connection_closed(self, conn):
+        return False
+
+    def lock_table(self, txn, table):
+        return
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 032334bfd6..54a3c9d805 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -153,7 +153,7 @@ class EventFederationStore(SQLBaseStore):
         results = self._get_prev_events_and_state(
             txn,
             event_id,
-            is_state=1,
+            is_state=True,
         )
 
         return [(e_id, h, ) for e_id, h, _ in results]
@@ -164,7 +164,7 @@ class EventFederationStore(SQLBaseStore):
         }
 
         if is_state is not None:
-            keyvalues["is_state"] = is_state
+            keyvalues["is_state"] = bool(is_state)
 
         res = self._simple_select_list_txn(
             txn,
@@ -242,7 +242,6 @@ class EventFederationStore(SQLBaseStore):
                     "room_id": room_id,
                     "min_depth": depth,
                 },
-                or_replace=True,
             )
 
     def _handle_prev_events(self, txn, outlier, event_id, prev_events,
@@ -260,9 +259,8 @@ class EventFederationStore(SQLBaseStore):
                     "event_id": event_id,
                     "prev_event_id": e_id,
                     "room_id": room_id,
-                    "is_state": 0,
+                    "is_state": False,
                 },
-                or_ignore=True,
             )
 
         # Update the extremities table if this is not an outlier.
@@ -281,19 +279,19 @@ class EventFederationStore(SQLBaseStore):
             # We only insert as a forward extremity the new event if there are
             # no other events that reference it as a prev event
             query = (
-                "INSERT OR IGNORE INTO %(table)s (event_id, room_id) "
-                "SELECT ?, ? WHERE NOT EXISTS ("
-                "SELECT 1 FROM %(event_edges)s WHERE "
-                "prev_event_id = ? "
-                ")"
-            ) % {
-                "table": "event_forward_extremities",
-                "event_edges": "event_edges",
-            }
+                "SELECT 1 FROM event_edges WHERE prev_event_id = ?"
+            )
 
-            logger.debug("query: %s", query)
+            txn.execute(query, (event_id,))
 
-            txn.execute(query, (event_id, room_id, event_id))
+            if not txn.fetchone():
+                query = (
+                    "INSERT INTO event_forward_extremities"
+                    " (event_id, room_id)"
+                    " VALUES (?, ?)"
+                )
+
+                txn.execute(query, (event_id, room_id))
 
             # Insert all the prev_events as a backwards thing, they'll get
             # deleted in a second if they're incorrect anyway.
@@ -306,7 +304,6 @@ class EventFederationStore(SQLBaseStore):
                         "event_id": e_id,
                         "room_id": room_id,
                     },
-                    or_ignore=True,
                 )
 
             # Also delete from the backwards extremities table all ones that
@@ -400,7 +397,7 @@ class EventFederationStore(SQLBaseStore):
 
         query = (
             "SELECT prev_event_id FROM event_edges "
-            "WHERE room_id = ? AND event_id = ? AND is_state = 0 "
+            "WHERE room_id = ? AND event_id = ? AND is_state = ? "
             "LIMIT ?"
         )
 
@@ -409,7 +406,7 @@ class EventFederationStore(SQLBaseStore):
             for event_id in front:
                 txn.execute(
                     query,
-                    (room_id, event_id, limit - len(event_results))
+                    (room_id, event_id, False, limit - len(event_results))
                 )
 
                 for e_id, in txn.fetchall():
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 2425f57f5f..a3c260ddc4 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -52,7 +52,6 @@ class EventsStore(SQLBaseStore):
                 is_new_state=is_new_state,
                 current_state=current_state,
             )
-            self.get_room_events_max_id.invalidate()
         except _RollbackButIsFineException:
             pass
 
@@ -96,12 +95,22 @@ class EventsStore(SQLBaseStore):
         # Remove the any existing cache entries for the event_id
         self._invalidate_get_event_cache(event.event_id)
 
+        if stream_ordering is None:
+            with self._stream_id_gen.get_next_txn(txn) as stream_ordering:
+                return self._persist_event_txn(
+                    txn, event, context, backfilled,
+                    stream_ordering=stream_ordering,
+                    is_new_state=is_new_state,
+                    current_state=current_state,
+                )
+
         # We purposefully do this first since if we include a `current_state`
         # key, we *want* to update the `current_state_events` table
         if current_state:
-            txn.execute(
-                "DELETE FROM current_state_events WHERE room_id = ?",
-                (event.room_id,)
+            self._simple_delete_txn(
+                txn,
+                table="current_state_events",
+                keyvalues={"room_id": event.room_id},
             )
 
             for s in current_state:
@@ -114,7 +123,6 @@ class EventsStore(SQLBaseStore):
                         "type": s.type,
                         "state_key": s.state_key,
                     },
-                    or_replace=True,
                 )
 
         if event.is_state() and is_new_state:
@@ -128,7 +136,6 @@ class EventsStore(SQLBaseStore):
                         "type": event.type,
                         "state_key": event.state_key,
                     },
-                    or_replace=True,
                 )
 
                 for prev_state_id, _ in event.prev_state:
@@ -151,14 +158,6 @@ class EventsStore(SQLBaseStore):
                 event.depth
             )
 
-        self._handle_prev_events(
-            txn,
-            outlier=outlier,
-            event_id=event.event_id,
-            prev_events=event.prev_events,
-            room_id=event.room_id,
-        )
-
         have_persisted = self._simple_select_one_onecol_txn(
             txn,
             table="event_json",
@@ -169,7 +168,7 @@ class EventsStore(SQLBaseStore):
 
         metadata_json = encode_canonical_json(
             event.internal_metadata.get_dict()
-        )
+        ).decode("UTF-8")
 
         # If we have already persisted this event, we don't need to do any
         # more processing.
@@ -185,23 +184,29 @@ class EventsStore(SQLBaseStore):
                 )
                 txn.execute(
                     sql,
-                    (metadata_json.decode("UTF-8"), event.event_id,)
+                    (metadata_json, event.event_id,)
                 )
 
                 sql = (
-                    "UPDATE events SET outlier = 0"
+                    "UPDATE events SET outlier = ?"
                     " WHERE event_id = ?"
                 )
                 txn.execute(
                     sql,
-                    (event.event_id,)
+                    (False, event.event_id,)
                 )
             return
 
+        self._handle_prev_events(
+            txn,
+            outlier=outlier,
+            event_id=event.event_id,
+            prev_events=event.prev_events,
+            room_id=event.room_id,
+        )
+
         if event.type == EventTypes.Member:
             self._store_room_member_txn(txn, event)
-        elif event.type == EventTypes.Feedback:
-            self._store_feedback_txn(txn, event)
         elif event.type == EventTypes.Name:
             self._store_room_name_txn(txn, event)
         elif event.type == EventTypes.Topic:
@@ -224,10 +229,9 @@ class EventsStore(SQLBaseStore):
             values={
                 "event_id": event.event_id,
                 "room_id": event.room_id,
-                "internal_metadata": metadata_json.decode("UTF-8"),
+                "internal_metadata": metadata_json,
                 "json": encode_canonical_json(event_dict).decode("UTF-8"),
             },
-            or_replace=True,
         )
 
         content = encode_canonical_json(
@@ -245,9 +249,6 @@ class EventsStore(SQLBaseStore):
             "depth": event.depth,
         }
 
-        if stream_ordering is not None:
-            vals["stream_ordering"] = stream_ordering
-
         unrec = {
             k: v
             for k, v in event.get_dict().items()
@@ -264,25 +265,53 @@ class EventsStore(SQLBaseStore):
             unrec
         ).decode("UTF-8")
 
-        try:
-            self._simple_insert_txn(
-                txn,
-                "events",
-                vals,
-                or_replace=(not outlier),
-                or_ignore=bool(outlier),
-            )
-        except:
-            logger.warn(
-                "Failed to persist, probably duplicate: %s",
-                event.event_id,
-                exc_info=True,
+        sql = (
+            "INSERT INTO events"
+            " (stream_ordering, topological_ordering, event_id, type,"
+            " room_id, content, processed, outlier, depth)"
+            " VALUES (?,?,?,?,?,?,?,?,?)"
+        )
+
+        txn.execute(
+            sql,
+            (
+                stream_ordering, event.depth, event.event_id, event.type,
+                event.room_id, content, True, outlier, event.depth
             )
-            raise _RollbackButIsFineException("_persist_event")
+        )
 
         if context.rejected:
             self._store_rejections_txn(txn, event.event_id, context.rejected)
 
+        for hash_alg, hash_base64 in event.hashes.items():
+            hash_bytes = decode_base64(hash_base64)
+            self._store_event_content_hash_txn(
+                txn, event.event_id, hash_alg, hash_bytes,
+            )
+
+        for prev_event_id, prev_hashes in event.prev_events:
+            for alg, hash_base64 in prev_hashes.items():
+                hash_bytes = decode_base64(hash_base64)
+                self._store_prev_event_hash_txn(
+                    txn, event.event_id, prev_event_id, alg, hash_bytes
+                )
+
+        for auth_id, _ in event.auth_events:
+            self._simple_insert_txn(
+                txn,
+                table="event_auth",
+                values={
+                    "event_id": event.event_id,
+                    "room_id": event.room_id,
+                    "auth_id": auth_id,
+                },
+            )
+
+        (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event)
+        self._store_event_reference_hash_txn(
+            txn, event.event_id, ref_alg, ref_hash_bytes
+        )
+
         if event.is_state():
             vals = {
                 "event_id": event.event_id,
@@ -301,18 +330,6 @@ class EventsStore(SQLBaseStore):
                 vals,
             )
 
-            if is_new_state and not context.rejected:
-                self._simple_insert_txn(
-                    txn,
-                    "current_state_events",
-                    {
-                        "event_id": event.event_id,
-                        "room_id": event.room_id,
-                        "type": event.type,
-                        "state_key": event.state_key,
-                    },
-                )
-
             for e_id, h in event.prev_state:
                 self._simple_insert_txn(
                     txn,
@@ -321,39 +338,24 @@ class EventsStore(SQLBaseStore):
                         "event_id": event.event_id,
                         "prev_event_id": e_id,
                         "room_id": event.room_id,
-                        "is_state": 1,
+                        "is_state": True,
                     },
                 )
 
-        for hash_alg, hash_base64 in event.hashes.items():
-            hash_bytes = decode_base64(hash_base64)
-            self._store_event_content_hash_txn(
-                txn, event.event_id, hash_alg, hash_bytes,
-            )
-
-        for prev_event_id, prev_hashes in event.prev_events:
-            for alg, hash_base64 in prev_hashes.items():
-                hash_bytes = decode_base64(hash_base64)
-                self._store_prev_event_hash_txn(
-                    txn, event.event_id, prev_event_id, alg, hash_bytes
+            if is_new_state and not context.rejected:
+                self._simple_upsert_txn(
+                    txn,
+                    "current_state_events",
+                    keyvalues={
+                        "room_id": event.room_id,
+                        "type": event.type,
+                        "state_key": event.state_key,
+                    },
+                    values={
+                        "event_id": event.event_id,
+                    }
                 )
 
-        for auth_id, _ in event.auth_events:
-            self._simple_insert_txn(
-                txn,
-                table="event_auth",
-                values={
-                    "event_id": event.event_id,
-                    "room_id": event.room_id,
-                    "auth_id": auth_id,
-                },
-            )
-
-        (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event)
-        self._store_event_reference_hash_txn(
-            txn, event.event_id, ref_alg, ref_hash_bytes
-        )
-
     def _store_redaction(self, txn, event):
         # invalidate the cache for the redacted event
         self._invalidate_get_event_cache(event.redacts)
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index 22b158d71e..cbe9339ccf 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -57,16 +57,18 @@ class KeyStore(SQLBaseStore):
             OpenSSL.crypto.FILETYPE_ASN1, tls_certificate
         )
         fingerprint = hashlib.sha256(tls_certificate_bytes).hexdigest()
-        return self._simple_insert(
+        return self._simple_upsert(
             table="server_tls_certificates",
-            values={
+            keyvalues={
                 "server_name": server_name,
                 "fingerprint": fingerprint,
+            },
+            values={
                 "from_server": from_server,
                 "ts_added_ms": time_now_ms,
                 "tls_certificate": buffer(tls_certificate_bytes),
             },
-            or_ignore=True,
+            desc="store_server_certificate",
         )
 
     @defer.inlineCallbacks
@@ -107,16 +109,18 @@ class KeyStore(SQLBaseStore):
             ts_now_ms (int): The time now in milliseconds
             verification_key (VerifyKey): The NACL verify key.
         """
-        return self._simple_insert(
+        return self._simple_upsert(
             table="server_signature_keys",
-            values={
+            keyvalues={
                 "server_name": server_name,
                 "key_id": "%s:%s" % (verify_key.alg, verify_key.version),
+            },
+            values={
                 "from_server": from_server,
                 "ts_added_ms": time_now_ms,
                 "verify_key": buffer(verify_key.encode()),
             },
-            or_ignore=True,
+            desc="store_server_verify_key",
         )
 
     def store_server_keys_json(self, server_name, key_id, from_server,
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index 87fba55439..22ec94bc16 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -57,6 +57,7 @@ class PresenceStore(SQLBaseStore):
             values={"observed_user_id": observed_localpart,
                     "observer_user_id": observer_userid},
             desc="allow_presence_visible",
+            or_ignore=True,
         )
 
     def disallow_presence_visible(self, observed_localpart, observer_userid):
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index c47bdc2861..ee7718d5ed 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -154,7 +154,7 @@ class PushRuleStore(SQLBaseStore):
             txn.execute(sql, (user_name, priority_class, new_rule_priority))
 
         # now insert the new rule
-        sql = "INSERT OR REPLACE INTO "+PushRuleTable.table_name+" ("
+        sql = "INSERT INTO "+PushRuleTable.table_name+" ("
         sql += ",".join(new_rule.keys())+") VALUES ("
         sql += ", ".join(["?" for _ in new_rule.keys()])+")"
 
@@ -183,7 +183,7 @@ class PushRuleStore(SQLBaseStore):
         new_rule['priority_class'] = priority_class
         new_rule['priority'] = new_prio
 
-        sql = "INSERT OR REPLACE INTO "+PushRuleTable.table_name+" ("
+        sql = "INSERT INTO "+PushRuleTable.table_name+" ("
         sql += ",".join(new_rule.keys())+") VALUES ("
         sql += ", ".join(["?" for _ in new_rule.keys()])+")"
 
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 000502b4ff..2582a1da66 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -13,13 +13,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import collections
-
 from ._base import SQLBaseStore, Table
 from twisted.internet import defer
 
 from synapse.api.errors import StoreError
 
+from syutil.jsonutil import encode_canonical_json
+
 import logging
 
 logger = logging.getLogger(__name__)
@@ -27,93 +27,55 @@ logger = logging.getLogger(__name__)
 
 class PusherStore(SQLBaseStore):
     @defer.inlineCallbacks
-    def get_pushers_by_app_id_and_pushkey(self, app_id_and_pushkey):
+    def get_pushers_by_app_id_and_pushkey(self, app_id, pushkey):
         sql = (
-            "SELECT id, user_name, kind, profile_tag, app_id,"
-            "app_display_name, device_display_name, pushkey, ts, data, "
-            "last_token, last_success, failing_since "
-            "FROM pushers "
+            "SELECT * FROM pushers "
             "WHERE app_id = ? AND pushkey = ?"
         )
 
-        rows = yield self._execute(
-            "get_pushers_by_app_id_and_pushkey", None, sql,
-            app_id_and_pushkey[0], app_id_and_pushkey[1]
+        rows = yield self._execute_and_decode(
+            "get_pushers_by_app_id_and_pushkey",
+            sql,
+            app_id, pushkey
         )
 
-        ret = [
-            {
-                "id": r[0],
-                "user_name": r[1],
-                "kind": r[2],
-                "profile_tag": r[3],
-                "app_id": r[4],
-                "app_display_name": r[5],
-                "device_display_name": r[6],
-                "pushkey": r[7],
-                "pushkey_ts": r[8],
-                "data": r[9],
-                "last_token": r[10],
-                "last_success": r[11],
-                "failing_since": r[12]
-            }
-            for r in rows
-        ]
-
-        defer.returnValue(ret[0])
+        defer.returnValue(rows)
 
     @defer.inlineCallbacks
     def get_all_pushers(self):
         sql = (
-            "SELECT id, user_name, kind, profile_tag, app_id,"
-            "app_display_name, device_display_name, pushkey, ts, data, "
-            "last_token, last_success, failing_since "
-            "FROM pushers"
+            "SELECT * FROM pushers"
         )
 
-        rows = yield self._execute("get_all_pushers", None, sql)
-
-        ret = [
-            {
-                "id": r[0],
-                "user_name": r[1],
-                "kind": r[2],
-                "profile_tag": r[3],
-                "app_id": r[4],
-                "app_display_name": r[5],
-                "device_display_name": r[6],
-                "pushkey": r[7],
-                "pushkey_ts": r[8],
-                "data": r[9],
-                "last_token": r[10],
-                "last_success": r[11],
-                "failing_since": r[12]
-            }
-            for r in rows
-        ]
-
-        defer.returnValue(ret)
+        rows = yield self._execute_and_decode("get_all_pushers", sql)
+
+        defer.returnValue(rows)
 
     @defer.inlineCallbacks
-    def add_pusher(self, user_name, profile_tag, kind, app_id,
+    def add_pusher(self, user_name, access_token, profile_tag, kind, app_id,
                    app_display_name, device_display_name,
                    pushkey, pushkey_ts, lang, data):
         try:
+            next_id = yield self._pushers_id_gen.get_next()
             yield self._simple_upsert(
                 PushersTable.table_name,
                 dict(
                     app_id=app_id,
                     pushkey=pushkey,
+                    user_name=user_name,
                 ),
                 dict(
-                    user_name=user_name,
+                    access_token=access_token,
                     kind=kind,
                     profile_tag=profile_tag,
                     app_display_name=app_display_name,
                     device_display_name=device_display_name,
                     ts=pushkey_ts,
                     lang=lang,
-                    data=data
+                    data=encode_canonical_json(data).decode("UTF-8"),
+                ),
+                insertion_values=dict(
+                    id=next_id,
                 ),
                 desc="add_pusher",
             )
@@ -122,37 +84,38 @@ class PusherStore(SQLBaseStore):
             raise StoreError(500, "Problem creating pusher.")
 
     @defer.inlineCallbacks
-    def delete_pusher_by_app_id_pushkey(self, app_id, pushkey):
+    def delete_pusher_by_app_id_pushkey_user_name(self, app_id, pushkey, user_name):
         yield self._simple_delete_one(
             PushersTable.table_name,
-            {"app_id": app_id, "pushkey": pushkey},
-            desc="delete_pusher_by_app_id_pushkey",
+            {"app_id": app_id, "pushkey": pushkey, 'user_name': user_name},
+            desc="delete_pusher_by_app_id_pushkey_user_name",
         )
 
     @defer.inlineCallbacks
-    def update_pusher_last_token(self, app_id, pushkey, last_token):
+    def update_pusher_last_token(self, app_id, pushkey, user_name, last_token):
         yield self._simple_update_one(
             PushersTable.table_name,
-            {'app_id': app_id, 'pushkey': pushkey},
+            {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_name},
             {'last_token': last_token},
             desc="update_pusher_last_token",
         )
 
     @defer.inlineCallbacks
-    def update_pusher_last_token_and_success(self, app_id, pushkey,
+    def update_pusher_last_token_and_success(self, app_id, pushkey, user_name,
                                              last_token, last_success):
         yield self._simple_update_one(
             PushersTable.table_name,
-            {'app_id': app_id, 'pushkey': pushkey},
+            {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_name},
             {'last_token': last_token, 'last_success': last_success},
             desc="update_pusher_last_token_and_success",
         )
 
     @defer.inlineCallbacks
-    def update_pusher_failing_since(self, app_id, pushkey, failing_since):
+    def update_pusher_failing_since(self, app_id, pushkey, user_name,
+                                    failing_since):
         yield self._simple_update_one(
             PushersTable.table_name,
-            {'app_id': app_id, 'pushkey': pushkey},
+            {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_name},
             {'failing_since': failing_since},
             desc="update_pusher_failing_since",
         )
@@ -160,21 +123,3 @@ class PusherStore(SQLBaseStore):
 
 class PushersTable(Table):
     table_name = "pushers"
-
-    fields = [
-        "id",
-        "user_name",
-        "kind",
-        "profile_tag",
-        "app_id",
-        "app_display_name",
-        "device_display_name",
-        "pushkey",
-        "pushkey_ts",
-        "data",
-        "last_token",
-        "last_success",
-        "failing_since"
-    ]
-
-    EntryType = collections.namedtuple("PusherEntry", fields)
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index f24154f146..a986c4816e 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -15,8 +15,6 @@
 
 from twisted.internet import defer
 
-from sqlite3 import IntegrityError
-
 from synapse.api.errors import StoreError, Codes
 
 from ._base import SQLBaseStore, cached
@@ -39,17 +37,13 @@ class RegistrationStore(SQLBaseStore):
         Raises:
             StoreError if there was a problem adding this.
         """
-        row = yield self._simple_select_one(
-            "users", {"name": user_id}, ["id"],
-            desc="add_access_token_to_user",
-        )
-        if not row:
-            raise StoreError(400, "Bad user ID supplied.")
-        row_id = row["id"]
+        next_id = yield self._access_tokens_id_gen.get_next()
+
         yield self._simple_insert(
             "access_tokens",
             {
-                "user_id": row_id,
+                "id": next_id,
+                "user_id": user_id,
                 "token": token
             },
             desc="add_access_token_to_user",
@@ -74,32 +68,71 @@ class RegistrationStore(SQLBaseStore):
     def _register(self, txn, user_id, token, password_hash):
         now = int(self.clock.time())
 
+        next_id = self._access_tokens_id_gen.get_next_txn(txn)
+
         try:
             txn.execute("INSERT INTO users(name, password_hash, creation_ts) "
                         "VALUES (?,?,?)",
                         [user_id, password_hash, now])
-        except IntegrityError:
+        except self.database_engine.module.IntegrityError:
             raise StoreError(
                 400, "User ID already taken.", errcode=Codes.USER_IN_USE
             )
 
         # it's possible for this to get a conflict, but only for a single user
         # since tokens are namespaced based on their user ID
-        txn.execute("INSERT INTO access_tokens(user_id, token) " +
-                    "VALUES (?,?)", [txn.lastrowid, token])
+        txn.execute(
+            "INSERT INTO access_tokens(id, user_id, token)"
+            " VALUES (?,?,?)",
+            (next_id, user_id, token,)
+        )
 
     def get_user_by_id(self, user_id):
-        query = ("SELECT users.name, users.password_hash FROM users"
-                 " WHERE users.name = ?")
-        return self._execute(
-            "get_user_by_id", self.cursor_to_dict, query, user_id
+        return self._simple_select_one(
+            table="users",
+            keyvalues={
+                "name": user_id,
+            },
+            retcols=["name", "password_hash"],
+            allow_none=True,
+        )
+
+    @defer.inlineCallbacks
+    def user_set_password_hash(self, user_id, password_hash):
+        """
+        NB. This does *not* evict any cache because the one use for this
+            removes most of the entries subsequently anyway so it would be
+            pointless. Use flush_user separately.
+        """
+        yield self._simple_update_one('users', {
+            'name': user_id
+        }, {
+            'password_hash': password_hash
+        })
+
+    @defer.inlineCallbacks
+    def user_delete_access_tokens_apart_from(self, user_id, token_id):
+        rows = yield self.get_user_by_id(user_id)
+        if len(rows) == 0:
+            raise Exception("No such user!")
+
+        yield self._execute(
+            "delete_access_tokens_apart_from", None,
+            "DELETE FROM access_tokens WHERE user_id = ? AND id != ?",
+            rows[0]['id'], token_id
+        )
+
+    @defer.inlineCallbacks
+    def flush_user(self, user_id):
+        rows = yield self._execute(
+            'flush_user', None,
+            "SELECT token FROM access_tokens WHERE user_id = ?",
+            user_id
         )
+        for r in rows:
+            self.get_user_by_token.invalidate(r)
 
     @cached()
-    # TODO(paul): Currently there's no code to invalidate this cache. That
-    #   means if/when we ever add internal ways to invalidate access tokens or
-    #   change whether a user is a server admin, those will need to invoke
-    #      store.get_user_by_token.invalidate(token)
     def get_user_by_token(self, token):
         """Get a user from the given access token.
 
@@ -134,13 +167,49 @@ class RegistrationStore(SQLBaseStore):
             "SELECT users.name, users.admin,"
             " access_tokens.device_id, access_tokens.id as token_id"
             " FROM users"
-            " INNER JOIN access_tokens on users.id = access_tokens.user_id"
+            " INNER JOIN access_tokens on users.name = access_tokens.user_id"
             " WHERE token = ?"
         )
 
-        cursor = txn.execute(sql, (token,))
-        rows = self.cursor_to_dict(cursor)
+        txn.execute(sql, (token,))
+        rows = self.cursor_to_dict(txn)
         if rows:
             return rows[0]
 
-        raise StoreError(404, "Token not found.")
+        return None
+
+    @defer.inlineCallbacks
+    def user_add_threepid(self, user_id, medium, address, validated_at, added_at):
+        yield self._simple_upsert("user_threepids", {
+            "user": user_id,
+            "medium": medium,
+            "address": address,
+        }, {
+            "validated_at": validated_at,
+            "added_at": added_at,
+        })
+
+    @defer.inlineCallbacks
+    def user_get_threepids(self, user_id):
+        ret = yield self._simple_select_list(
+            "user_threepids", {
+                "user": user_id
+            },
+            ['medium', 'address', 'validated_at', 'added_at'],
+            'user_get_threepids'
+        )
+        defer.returnValue(ret)
+
+    @defer.inlineCallbacks
+    def get_user_by_threepid(self, medium, address):
+        ret = yield self._simple_select_one(
+            "user_threepids",
+            {
+                "medium": medium,
+                "address": address
+            },
+            ['user'], True, 'get_user_by_threepid'
+        )
+        if ret:
+            defer.returnValue(ret['user'])
+        defer.returnValue(None)
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index be3e28c2ea..48ebb33057 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -72,6 +72,7 @@ class RoomStore(SQLBaseStore):
             keyvalues={"room_id": room_id},
             retcols=RoomsTable.fields,
             desc="get_room",
+            allow_none=True,
         )
 
     @defer.inlineCallbacks
@@ -102,24 +103,37 @@ class RoomStore(SQLBaseStore):
                 "ON c.event_id = room_names.event_id "
             )
 
-            # We use non printing ascii character US () as a seperator
+            # We use non printing ascii character US (\x1F) as a separator
             sql = (
-                "SELECT r.room_id, n.name, t.topic, "
-                "group_concat(a.room_alias, '') "
-                "FROM rooms AS r "
-                "LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id "
-                "LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id "
-                "INNER JOIN room_aliases AS a ON a.room_id = r.room_id "
-                "WHERE r.is_public = ? "
-                "GROUP BY r.room_id "
+                "SELECT r.room_id, max(n.name), max(t.topic)"
+                " FROM rooms AS r"
+                " LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id"
+                " LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id"
+                " WHERE r.is_public = ?"
+                " GROUP BY r.room_id"
             ) % {
                 "topic": topic_subquery,
                 "name": name_subquery,
             }
 
-            c = txn.execute(sql, (is_public,))
+            txn.execute(sql, (is_public,))
 
-            return c.fetchall()
+            rows = txn.fetchall()
+
+            for i, row in enumerate(rows):
+                room_id = row[0]
+                aliases = self._simple_select_onecol_txn(
+                    txn,
+                    table="room_aliases",
+                    keyvalues={
+                        "room_id": room_id
+                    },
+                    retcol="room_alias",
+                )
+
+                rows[i] = list(row) + [aliases]
+
+            return rows
 
         rows = yield self.runInteraction(
             "get_rooms", f
@@ -130,9 +144,10 @@ class RoomStore(SQLBaseStore):
                 "room_id": r[0],
                 "name": r[1],
                 "topic": r[2],
-                "aliases": r[3].split(""),
+                "aliases": r[3],
             }
             for r in rows
+            if r[3]  # We only return rooms that have at least one alias.
         ]
 
         defer.returnValue(ret)
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 52c37c76f5..8ea5756d61 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -40,7 +40,6 @@ class RoomMemberStore(SQLBaseStore):
         """
         try:
             target_user_id = event.state_key
-            domain = UserID.from_string(target_user_id).domain
         except:
             logger.exception(
                 "Failed to parse target_user_id=%s", target_user_id
@@ -65,42 +64,8 @@ class RoomMemberStore(SQLBaseStore):
             }
         )
 
-        # Update room hosts table
-        if event.membership == Membership.JOIN:
-            sql = (
-                "INSERT OR IGNORE INTO room_hosts (room_id, host) "
-                "VALUES (?, ?)"
-            )
-            txn.execute(sql, (event.room_id, domain))
-        elif event.membership != Membership.INVITE:
-            # Check if this was the last person to have left.
-            member_events = self._get_members_query_txn(
-                txn,
-                where_clause=("c.room_id = ? AND m.membership = ?"
-                              " AND m.user_id != ?"),
-                where_values=(event.room_id, Membership.JOIN, target_user_id,)
-            )
-
-            joined_domains = set()
-            for e in member_events:
-                try:
-                    joined_domains.add(
-                        UserID.from_string(e.state_key).domain
-                    )
-                except:
-                    # FIXME: How do we deal with invalid user ids in the db?
-                    logger.exception("Invalid user_id: %s", event.state_key)
-
-            if domain not in joined_domains:
-                sql = (
-                    "DELETE FROM room_hosts WHERE room_id = ? AND host = ?"
-                )
-
-                txn.execute(sql, (event.room_id, domain))
-
         self.get_rooms_for_user.invalidate(target_user_id)
 
-    @defer.inlineCallbacks
     def get_room_member(self, user_id, room_id):
         """Retrieve the current state of a room member.
 
@@ -110,41 +75,27 @@ class RoomMemberStore(SQLBaseStore):
         Returns:
             Deferred: Results in a MembershipEvent or None.
         """
-        rows = yield self._get_members_by_dict({
-            "e.room_id": room_id,
-            "m.user_id": user_id,
-        })
+        def f(txn):
+            events = self._get_members_events_txn(
+                txn,
+                room_id,
+                user_id=user_id,
+            )
 
-        defer.returnValue(rows[0] if rows else None)
+            return events[0] if events else None
 
-    def _get_room_member(self, txn, user_id, room_id):
-        sql = (
-            "SELECT e.* FROM events as e"
-            " INNER JOIN room_memberships as m"
-            " ON e.event_id = m.event_id"
-            " INNER JOIN current_state_events as c"
-            " ON m.event_id = c.event_id"
-            " WHERE m.user_id = ? and e.room_id = ?"
-            " LIMIT 1"
-        )
-        txn.execute(sql, (user_id, room_id))
-        rows = self.cursor_to_dict(txn)
-        if rows:
-            return self._parse_events_txn(txn, rows)[0]
-        else:
-            return None
+        return self.runInteraction("get_room_member", f)
 
     def get_users_in_room(self, room_id):
         def f(txn):
-            sql = (
-                "SELECT m.user_id FROM room_memberships as m"
-                " INNER JOIN current_state_events as c"
-                " ON m.event_id = c.event_id"
-                " WHERE m.membership = ? AND m.room_id = ?"
+
+            rows = self._get_members_rows_txn(
+                txn,
+                room_id=room_id,
+                membership=Membership.JOIN,
             )
 
-            txn.execute(sql, (Membership.JOIN, room_id))
-            return [r[0] for r in txn.fetchall()]
+            return [r["user_id"] for r in rows]
         return self.runInteraction("get_users_in_room", f)
 
     def get_room_members(self, room_id, membership=None):
@@ -159,11 +110,14 @@ class RoomMemberStore(SQLBaseStore):
             list of namedtuples representing the members in this room.
         """
 
-        where = {"m.room_id": room_id}
-        if membership:
-            where["m.membership"] = membership
+        def f(txn):
+            return self._get_members_events_txn(
+                txn,
+                room_id,
+                membership=membership,
+            )
 
-        return self._get_members_by_dict(where)
+        return self.runInteraction("get_room_members", f)
 
     def get_rooms_for_user_where_membership_is(self, user_id, membership_list):
         """ Get all the rooms for this user where the membership for this user
@@ -209,32 +163,55 @@ class RoomMemberStore(SQLBaseStore):
         ]
 
     def get_joined_hosts_for_room(self, room_id):
-        return self._simple_select_onecol(
-            "room_hosts",
-            {"room_id": room_id},
-            "host",
-            desc="get_joined_hosts_for_room",
+        return self.runInteraction(
+            "get_joined_hosts_for_room",
+            self._get_joined_hosts_for_room_txn,
+            room_id,
+        )
+
+    def _get_joined_hosts_for_room_txn(self, txn, room_id):
+        rows = self._get_members_rows_txn(
+            txn,
+            room_id, membership=Membership.JOIN
+        )
+
+        joined_domains = set(
+            UserID.from_string(r["user_id"]).domain
+            for r in rows
         )
 
-    def _get_members_by_dict(self, where_dict):
-        clause = " AND ".join("%s = ?" % k for k in where_dict.keys())
-        vals = where_dict.values()
-        return self._get_members_query(clause, vals)
+        return joined_domains
 
     def _get_members_query(self, where_clause, where_values):
         return self.runInteraction(
-            "get_members_query", self._get_members_query_txn,
+            "get_members_query", self._get_members_events_txn,
             where_clause, where_values
         )
 
-    def _get_members_query_txn(self, txn, where_clause, where_values):
+    def _get_members_events_txn(self, txn, room_id, membership=None, user_id=None):
+        rows = self._get_members_rows_txn(
+            txn,
+            room_id, membership, user_id,
+        )
+        return self._get_events_txn(txn, [r["event_id"] for r in rows])
+
+    def _get_members_rows_txn(self, txn, room_id, membership=None, user_id=None):
+        where_clause = "c.room_id = ?"
+        where_values = [room_id]
+
+        if membership:
+            where_clause += " AND m.membership = ?"
+            where_values.append(membership)
+
+        if user_id:
+            where_clause += " AND m.user_id = ?"
+            where_values.append(user_id)
+
         sql = (
-            "SELECT e.* FROM events as e "
-            "INNER JOIN room_memberships as m "
-            "ON e.event_id = m.event_id "
-            "INNER JOIN current_state_events as c "
-            "ON m.event_id = c.event_id "
-            "WHERE %(where)s "
+            "SELECT m.* FROM room_memberships as m"
+            " INNER JOIN current_state_events as c"
+            " ON m.event_id = c.event_id"
+            " WHERE %(where)s"
         ) % {
             "where": where_clause,
         }
@@ -242,8 +219,7 @@ class RoomMemberStore(SQLBaseStore):
         txn.execute(sql, where_values)
         rows = self.cursor_to_dict(txn)
 
-        results = self._parse_events_txn(txn, rows)
-        return results
+        return rows
 
     @cached()
     def get_rooms_for_user(self, user_id):
diff --git a/synapse/storage/schema/delta/12/v12.sql b/synapse/storage/schema/delta/12/v12.sql
index b87ef1fe79..878c36260a 100644
--- a/synapse/storage/schema/delta/12/v12.sql
+++ b/synapse/storage/schema/delta/12/v12.sql
@@ -17,26 +17,25 @@ CREATE TABLE IF NOT EXISTS rejections(
     event_id TEXT NOT NULL,
     reason TEXT NOT NULL,
     last_check TEXT NOT NULL,
-    CONSTRAINT ev_id UNIQUE (event_id) ON CONFLICT REPLACE
+    UNIQUE (event_id)
 );
 
 -- Push notification endpoints that users have configured
 CREATE TABLE IF NOT EXISTS pushers (
   id INTEGER PRIMARY KEY AUTOINCREMENT,
   user_name TEXT NOT NULL,
-  profile_tag varchar(32) NOT NULL,
-  kind varchar(8) NOT NULL,
-  app_id varchar(64) NOT NULL,
-  app_display_name varchar(64) NOT NULL,
-  device_display_name varchar(128) NOT NULL,
-  pushkey blob NOT NULL,
-  ts BIGINT NOT NULL,
-  lang varchar(8),
-  data blob,
+  profile_tag VARCHAR(32) NOT NULL,
+  kind VARCHAR(8) NOT NULL,
+  app_id VARCHAR(64) NOT NULL,
+  app_display_name VARCHAR(64) NOT NULL,
+  device_display_name VARCHAR(128) NOT NULL,
+  pushkey VARBINARY(512) NOT NULL,
+  ts BIGINT UNSIGNED NOT NULL,
+  lang VARCHAR(8),
+  data LONGBLOB,
   last_token TEXT,
-  last_success BIGINT,
-  failing_since BIGINT,
-  FOREIGN KEY(user_name) REFERENCES users(name),
+  last_success BIGINT UNSIGNED,
+  failing_since BIGINT UNSIGNED,
   UNIQUE (app_id, pushkey)
 );
 
@@ -55,13 +54,10 @@ CREATE INDEX IF NOT EXISTS push_rules_user_name on push_rules (user_name);
 
 CREATE TABLE IF NOT EXISTS user_filters(
   user_id TEXT,
-  filter_id INTEGER,
-  filter_json TEXT,
-  FOREIGN KEY(user_id) REFERENCES users(id)
+  filter_id BIGINT UNSIGNED,
+  filter_json LONGBLOB
 );
 
 CREATE INDEX IF NOT EXISTS user_filters_by_user_id_filter_id ON user_filters(
-  user_id, filter_id
+    user_id, filter_id
 );
-
-PRAGMA user_version = 12;
diff --git a/synapse/storage/schema/delta/13/v13.sql b/synapse/storage/schema/delta/13/v13.sql
index e491ad5aec..3265924013 100644
--- a/synapse/storage/schema/delta/13/v13.sql
+++ b/synapse/storage/schema/delta/13/v13.sql
@@ -19,16 +19,13 @@ CREATE TABLE IF NOT EXISTS application_services(
     token TEXT,
     hs_token TEXT,
     sender TEXT,
-    UNIQUE(token) ON CONFLICT ROLLBACK
+    UNIQUE(token)
 );
 
 CREATE TABLE IF NOT EXISTS application_services_regex(
     id INTEGER PRIMARY KEY AUTOINCREMENT,
-    as_id INTEGER NOT NULL,
+    as_id BIGINT UNSIGNED NOT NULL,
     namespace INTEGER,  /* enum[room_id|room_alias|user_id] */
     regex TEXT,
     FOREIGN KEY(as_id) REFERENCES application_services(id)
 );
-
-
-
diff --git a/synapse/storage/schema/delta/15/appservice_txns.sql b/synapse/storage/schema/delta/15/appservice_txns.sql
index 2b27e2a429..db2e720393 100644
--- a/synapse/storage/schema/delta/15/appservice_txns.sql
+++ b/synapse/storage/schema/delta/15/appservice_txns.sql
@@ -15,16 +15,17 @@
 
 CREATE TABLE IF NOT EXISTS application_services_state(
     as_id TEXT PRIMARY KEY,
-    state TEXT,
-    last_txn TEXT
+    state VARCHAR(5),
+    last_txn INTEGER
 );
 
 CREATE TABLE IF NOT EXISTS application_services_txns(
     as_id TEXT NOT NULL,
     txn_id INTEGER NOT NULL,
     event_ids TEXT NOT NULL,
-    UNIQUE(as_id, txn_id) ON CONFLICT ROLLBACK
+    UNIQUE(as_id, txn_id)
 );
 
-
-
+CREATE INDEX IF NOT EXISTS application_services_txns_id ON application_services_txns (
+    as_id
+);
diff --git a/synapse/storage/schema/delta/15/presence_indices.sql b/synapse/storage/schema/delta/15/presence_indices.sql
new file mode 100644
index 0000000000..6b8d0f1ca7
--- /dev/null
+++ b/synapse/storage/schema/delta/15/presence_indices.sql
@@ -0,0 +1,2 @@
+
+CREATE INDEX IF NOT EXISTS presence_list_user_id ON presence_list (user_id);
diff --git a/synapse/storage/schema/delta/15/v15.sql b/synapse/storage/schema/delta/15/v15.sql
new file mode 100644
index 0000000000..f5b2a08ca4
--- /dev/null
+++ b/synapse/storage/schema/delta/15/v15.sql
@@ -0,0 +1,25 @@
+-- Drop, copy & recreate pushers table to change unique key
+-- Also add access_token column at the same time
+CREATE TABLE IF NOT EXISTS pushers2 (
+  id INTEGER PRIMARY KEY AUTOINCREMENT,
+  user_name TEXT NOT NULL,
+  access_token INTEGER DEFAULT NULL,
+  profile_tag varchar(32) NOT NULL,
+  kind varchar(8) NOT NULL,
+  app_id varchar(64) NOT NULL,
+  app_display_name varchar(64) NOT NULL,
+  device_display_name varchar(128) NOT NULL,
+  pushkey blob NOT NULL,
+  ts BIGINT NOT NULL,
+  lang varchar(8),
+  data blob,
+  last_token TEXT,
+  last_success BIGINT,
+  failing_since BIGINT,
+  FOREIGN KEY(user_name) REFERENCES users(name),
+  UNIQUE (app_id, pushkey, user_name)
+);
+INSERT INTO pushers2 (id, user_name, profile_tag, kind, app_id, app_display_name, device_display_name, pushkey, ts, lang, data, last_token, last_success, failing_since)
+  SELECT id, user_name, profile_tag, kind, app_id, app_display_name, device_display_name, pushkey, ts, lang, data, last_token, last_success, failing_since FROM pushers;
+DROP TABLE pushers;
+ALTER TABLE pushers2 RENAME TO pushers;
diff --git a/synapse/storage/schema/delta/16/events_order_index.sql b/synapse/storage/schema/delta/16/events_order_index.sql
new file mode 100644
index 0000000000..a48f215170
--- /dev/null
+++ b/synapse/storage/schema/delta/16/events_order_index.sql
@@ -0,0 +1,4 @@
+CREATE INDEX events_order ON events (topological_ordering, stream_ordering);
+CREATE INDEX events_order_room ON events (
+    room_id, topological_ordering, stream_ordering
+);
diff --git a/synapse/storage/schema/delta/16/remote_media_cache_index.sql b/synapse/storage/schema/delta/16/remote_media_cache_index.sql
new file mode 100644
index 0000000000..7a15265cb1
--- /dev/null
+++ b/synapse/storage/schema/delta/16/remote_media_cache_index.sql
@@ -0,0 +1,2 @@
+CREATE INDEX IF NOT EXISTS remote_media_cache_thumbnails_media_id
+    ON remote_media_cache_thumbnails (media_id);
\ No newline at end of file
diff --git a/synapse/storage/schema/delta/16/remove_duplicates.sql b/synapse/storage/schema/delta/16/remove_duplicates.sql
new file mode 100644
index 0000000000..65c97b5e2f
--- /dev/null
+++ b/synapse/storage/schema/delta/16/remove_duplicates.sql
@@ -0,0 +1,9 @@
+
+
+DELETE FROM event_to_state_groups WHERE state_group not in (
+    SELECT MAX(state_group) FROM event_to_state_groups GROUP BY event_id
+);
+
+DELETE FROM event_to_state_groups WHERE rowid not in (
+    SELECT MIN(rowid) FROM event_to_state_groups GROUP BY event_id
+);
diff --git a/synapse/storage/schema/delta/16/room_alias_index.sql b/synapse/storage/schema/delta/16/room_alias_index.sql
new file mode 100644
index 0000000000..f82486132b
--- /dev/null
+++ b/synapse/storage/schema/delta/16/room_alias_index.sql
@@ -0,0 +1,3 @@
+
+CREATE INDEX IF NOT EXISTS room_aliases_id ON room_aliases(room_id);
+CREATE INDEX IF NOT EXISTS room_alias_servers_alias ON room_alias_servers(room_alias);
diff --git a/synapse/storage/schema/delta/16/unique_constraints.sql b/synapse/storage/schema/delta/16/unique_constraints.sql
new file mode 100644
index 0000000000..fecf11118c
--- /dev/null
+++ b/synapse/storage/schema/delta/16/unique_constraints.sql
@@ -0,0 +1,80 @@
+
+-- We can use SQLite features here, since other db support was only added in v16
+
+--
+DELETE FROM current_state_events WHERE rowid not in (
+    SELECT MIN(rowid) FROM current_state_events GROUP BY event_id
+);
+
+DROP INDEX IF EXISTS current_state_events_event_id;
+CREATE UNIQUE INDEX current_state_events_event_id ON current_state_events(event_id);
+
+--
+DELETE FROM room_memberships WHERE rowid not in (
+    SELECT MIN(rowid) FROM room_memberships GROUP BY event_id
+);
+
+DROP INDEX IF EXISTS room_memberships_event_id;
+CREATE UNIQUE INDEX room_memberships_event_id ON room_memberships(event_id);
+
+--
+DELETE FROM feedback WHERE rowid not in (
+    SELECT MIN(rowid) FROM feedback GROUP BY event_id
+);
+
+DROP INDEX IF EXISTS feedback_event_id;
+CREATE UNIQUE INDEX feedback_event_id ON feedback(event_id);
+
+--
+DELETE FROM topics WHERE rowid not in (
+    SELECT MIN(rowid) FROM topics GROUP BY event_id
+);
+
+DROP INDEX IF EXISTS topics_event_id;
+CREATE UNIQUE INDEX topics_event_id ON topics(event_id);
+
+--
+DELETE FROM room_names WHERE rowid not in (
+    SELECT MIN(rowid) FROM room_names GROUP BY event_id
+);
+
+DROP INDEX IF EXISTS room_names_id;
+CREATE UNIQUE INDEX room_names_id ON room_names(event_id);
+
+--
+DELETE FROM presence WHERE rowid not in (
+    SELECT MIN(rowid) FROM presence GROUP BY user_id
+);
+
+DROP INDEX IF EXISTS presence_id;
+CREATE UNIQUE INDEX presence_id ON presence(user_id);
+
+--
+DELETE FROM presence_allow_inbound WHERE rowid not in (
+    SELECT MIN(rowid) FROM presence_allow_inbound
+    GROUP BY observed_user_id, observer_user_id
+);
+
+DROP INDEX IF EXISTS presence_allow_inbound_observers;
+CREATE UNIQUE INDEX presence_allow_inbound_observers ON presence_allow_inbound(
+    observed_user_id, observer_user_id
+);
+
+--
+DELETE FROM presence_list WHERE rowid not in (
+    SELECT MIN(rowid) FROM presence_list
+    GROUP BY user_id, observed_user_id
+);
+
+DROP INDEX IF EXISTS presence_list_observers;
+CREATE UNIQUE INDEX presence_list_observers ON presence_list(
+    user_id, observed_user_id
+);
+
+--
+DELETE FROM room_aliases WHERE rowid not in (
+    SELECT MIN(rowid) FROM room_aliases GROUP BY room_alias
+);
+
+DROP INDEX IF EXISTS room_aliases_id;
+CREATE INDEX room_aliases_id ON room_aliases(room_id);
diff --git a/synapse/storage/schema/delta/16/users.sql b/synapse/storage/schema/delta/16/users.sql
new file mode 100644
index 0000000000..cd0709250d
--- /dev/null
+++ b/synapse/storage/schema/delta/16/users.sql
@@ -0,0 +1,56 @@
+-- Convert `access_tokens`.user from rowids to user strings.
+-- MUST BE DONE BEFORE REMOVING ID COLUMN FROM USERS TABLE BELOW
+CREATE TABLE IF NOT EXISTS new_access_tokens(
+    id BIGINT UNSIGNED PRIMARY KEY,
+    user_id TEXT NOT NULL,
+    device_id TEXT,
+    token TEXT NOT NULL,
+    last_used BIGINT UNSIGNED,
+    UNIQUE(token)
+);
+
+INSERT INTO new_access_tokens
+    SELECT a.id, u.name, a.device_id, a.token, a.last_used
+    FROM access_tokens as a
+    INNER JOIN users as u ON u.id = a.user_id;
+
+DROP TABLE access_tokens;
+
+ALTER TABLE new_access_tokens RENAME TO access_tokens;
+
+-- Remove ID column from `users` table
+CREATE TABLE IF NOT EXISTS new_users(
+    name TEXT,
+    password_hash TEXT,
+    creation_ts BIGINT UNSIGNED,
+    admin BOOL DEFAULT 0 NOT NULL,
+    UNIQUE(name)
+);
+
+INSERT INTO new_users SELECT name, password_hash, creation_ts, admin FROM users;
+
+DROP TABLE users;
+
+ALTER TABLE new_users RENAME TO users;
+
+
+-- Remove UNIQUE constraint from `user_ips` table
+CREATE TABLE IF NOT EXISTS new_user_ips (
+    user_id TEXT NOT NULL,
+    access_token TEXT NOT NULL,
+    device_id TEXT,
+    ip TEXT NOT NULL,
+    user_agent TEXT NOT NULL,
+    last_seen BIGINT UNSIGNED NOT NULL
+);
+
+INSERT INTO new_user_ips
+    SELECT user, access_token, device_id, ip, user_agent, last_seen FROM user_ips;
+
+DROP TABLE user_ips;
+
+ALTER TABLE new_user_ips RENAME TO user_ips;
+
+CREATE INDEX IF NOT EXISTS user_ips_user ON user_ips(user_id);
+CREATE INDEX IF NOT EXISTS user_ips_user_ip ON user_ips(user_id, access_token, ip);
+
diff --git a/synapse/storage/schema/full_schemas/11/event_edges.sql b/synapse/storage/schema/full_schemas/11/event_edges.sql
index 1e766d6db2..f7020f7793 100644
--- a/synapse/storage/schema/full_schemas/11/event_edges.sql
+++ b/synapse/storage/schema/full_schemas/11/event_edges.sql
@@ -16,52 +16,52 @@
 CREATE TABLE IF NOT EXISTS event_forward_extremities(
     event_id TEXT NOT NULL,
     room_id TEXT NOT NULL,
-    CONSTRAINT uniqueness UNIQUE (event_id, room_id) ON CONFLICT REPLACE
+    UNIQUE (event_id, room_id)
 );
 
-CREATE INDEX IF NOT EXISTS ev_extrem_room ON event_forward_extremities(room_id);
-CREATE INDEX IF NOT EXISTS ev_extrem_id ON event_forward_extremities(event_id);
+CREATE INDEX ev_extrem_room ON event_forward_extremities(room_id);
+CREATE INDEX ev_extrem_id ON event_forward_extremities(event_id);
 
 
 CREATE TABLE IF NOT EXISTS event_backward_extremities(
     event_id TEXT NOT NULL,
     room_id TEXT NOT NULL,
-    CONSTRAINT uniqueness UNIQUE (event_id, room_id) ON CONFLICT REPLACE
+    UNIQUE (event_id, room_id)
 );
 
-CREATE INDEX IF NOT EXISTS ev_b_extrem_room ON event_backward_extremities(room_id);
-CREATE INDEX IF NOT EXISTS ev_b_extrem_id ON event_backward_extremities(event_id);
+CREATE INDEX ev_b_extrem_room ON event_backward_extremities(room_id);
+CREATE INDEX ev_b_extrem_id ON event_backward_extremities(event_id);
 
 
 CREATE TABLE IF NOT EXISTS event_edges(
     event_id TEXT NOT NULL,
     prev_event_id TEXT NOT NULL,
     room_id TEXT NOT NULL,
-    is_state INTEGER NOT NULL,
-    CONSTRAINT uniqueness UNIQUE (event_id, prev_event_id, room_id, is_state)
+    is_state BOOL NOT NULL,
+    UNIQUE (event_id, prev_event_id, room_id, is_state)
 );
 
-CREATE INDEX IF NOT EXISTS ev_edges_id ON event_edges(event_id);
-CREATE INDEX IF NOT EXISTS ev_edges_prev_id ON event_edges(prev_event_id);
+CREATE INDEX ev_edges_id ON event_edges(event_id);
+CREATE INDEX ev_edges_prev_id ON event_edges(prev_event_id);
 
 
 CREATE TABLE IF NOT EXISTS room_depth(
     room_id TEXT NOT NULL,
     min_depth INTEGER NOT NULL,
-    CONSTRAINT uniqueness UNIQUE (room_id)
+    UNIQUE (room_id)
 );
 
-CREATE INDEX IF NOT EXISTS room_depth_room ON room_depth(room_id);
+CREATE INDEX room_depth_room ON room_depth(room_id);
 
 
 create TABLE IF NOT EXISTS event_destinations(
     event_id TEXT NOT NULL,
     destination TEXT NOT NULL,
-    delivered_ts INTEGER DEFAULT 0, -- or 0 if not delivered
-    CONSTRAINT uniqueness UNIQUE (event_id, destination) ON CONFLICT REPLACE
+    delivered_ts BIGINT DEFAULT 0, -- or 0 if not delivered
+    UNIQUE (event_id, destination)
 );
 
-CREATE INDEX IF NOT EXISTS event_destinations_id ON event_destinations(event_id);
+CREATE INDEX event_destinations_id ON event_destinations(event_id);
 
 
 CREATE TABLE IF NOT EXISTS state_forward_extremities(
@@ -69,21 +69,21 @@ CREATE TABLE IF NOT EXISTS state_forward_extremities(
     room_id TEXT NOT NULL,
     type TEXT NOT NULL,
     state_key TEXT NOT NULL,
-    CONSTRAINT uniqueness UNIQUE (event_id, room_id) ON CONFLICT REPLACE
+    UNIQUE (event_id, room_id)
 );
 
-CREATE INDEX IF NOT EXISTS st_extrem_keys ON state_forward_extremities(
+CREATE INDEX st_extrem_keys ON state_forward_extremities(
     room_id, type, state_key
 );
-CREATE INDEX IF NOT EXISTS st_extrem_id ON state_forward_extremities(event_id);
+CREATE INDEX st_extrem_id ON state_forward_extremities(event_id);
 
 
 CREATE TABLE IF NOT EXISTS event_auth(
     event_id TEXT NOT NULL,
     auth_id TEXT NOT NULL,
     room_id TEXT NOT NULL,
-    CONSTRAINT uniqueness UNIQUE (event_id, auth_id, room_id)
+    UNIQUE (event_id, auth_id, room_id)
 );
 
-CREATE INDEX IF NOT EXISTS evauth_edges_id ON event_auth(event_id);
-CREATE INDEX IF NOT EXISTS evauth_edges_auth_id ON event_auth(auth_id);
\ No newline at end of file
+CREATE INDEX evauth_edges_id ON event_auth(event_id);
+CREATE INDEX evauth_edges_auth_id ON event_auth(auth_id);
diff --git a/synapse/storage/schema/full_schemas/11/event_signatures.sql b/synapse/storage/schema/full_schemas/11/event_signatures.sql
index c28c39c48a..636b2d3353 100644
--- a/synapse/storage/schema/full_schemas/11/event_signatures.sql
+++ b/synapse/storage/schema/full_schemas/11/event_signatures.sql
@@ -16,50 +16,40 @@
 CREATE TABLE IF NOT EXISTS event_content_hashes (
     event_id TEXT,
     algorithm TEXT,
-    hash BLOB,
-    CONSTRAINT uniqueness UNIQUE (event_id, algorithm)
+    hash bytea,
+    UNIQUE (event_id, algorithm)
 );
 
-CREATE INDEX IF NOT EXISTS event_content_hashes_id ON event_content_hashes(
-    event_id
-);
+CREATE INDEX event_content_hashes_id ON event_content_hashes(event_id);
 
 
 CREATE TABLE IF NOT EXISTS event_reference_hashes (
     event_id TEXT,
     algorithm TEXT,
-    hash BLOB,
-    CONSTRAINT uniqueness UNIQUE (event_id, algorithm)
+    hash bytea,
+    UNIQUE (event_id, algorithm)
 );
 
-CREATE INDEX IF NOT EXISTS event_reference_hashes_id ON event_reference_hashes (
-    event_id
-);
+CREATE INDEX event_reference_hashes_id ON event_reference_hashes(event_id);
 
 
 CREATE TABLE IF NOT EXISTS event_signatures (
     event_id TEXT,
     signature_name TEXT,
     key_id TEXT,
-    signature BLOB,
-    CONSTRAINT uniqueness UNIQUE (event_id, signature_name, key_id)
+    signature bytea,
+    UNIQUE (event_id, signature_name, key_id)
 );
 
-CREATE INDEX IF NOT EXISTS event_signatures_id ON event_signatures (
-    event_id
-);
+CREATE INDEX event_signatures_id ON event_signatures(event_id);
 
 
 CREATE TABLE IF NOT EXISTS event_edge_hashes(
     event_id TEXT,
     prev_event_id TEXT,
     algorithm TEXT,
-    hash BLOB,
-    CONSTRAINT uniqueness UNIQUE (
-        event_id, prev_event_id, algorithm
-    )
+    hash bytea,
+    UNIQUE (event_id, prev_event_id, algorithm)
 );
 
-CREATE INDEX IF NOT EXISTS event_edge_hashes_id ON event_edge_hashes(
-    event_id
-);
+CREATE INDEX event_edge_hashes_id ON event_edge_hashes(event_id);
diff --git a/synapse/storage/schema/full_schemas/11/im.sql b/synapse/storage/schema/full_schemas/11/im.sql
index dd00c1cd2f..1901654ac2 100644
--- a/synapse/storage/schema/full_schemas/11/im.sql
+++ b/synapse/storage/schema/full_schemas/11/im.sql
@@ -15,7 +15,7 @@
 
 CREATE TABLE IF NOT EXISTS events(
     stream_ordering INTEGER PRIMARY KEY AUTOINCREMENT,
-    topological_ordering INTEGER NOT NULL,
+    topological_ordering BIGINT NOT NULL,
     event_id TEXT NOT NULL,
     type TEXT NOT NULL,
     room_id TEXT NOT NULL,
@@ -23,26 +23,24 @@ CREATE TABLE IF NOT EXISTS events(
     unrecognized_keys TEXT,
     processed BOOL NOT NULL,
     outlier BOOL NOT NULL,
-    depth INTEGER DEFAULT 0 NOT NULL,
-    CONSTRAINT ev_uniq UNIQUE (event_id)
+    depth BIGINT DEFAULT 0 NOT NULL,
+    UNIQUE (event_id)
 );
 
-CREATE INDEX IF NOT EXISTS events_event_id ON events (event_id);
-CREATE INDEX IF NOT EXISTS events_stream_ordering ON events (stream_ordering);
-CREATE INDEX IF NOT EXISTS events_topological_ordering ON events (topological_ordering);
-CREATE INDEX IF NOT EXISTS events_room_id ON events (room_id);
+CREATE INDEX events_stream_ordering ON events (stream_ordering);
+CREATE INDEX events_topological_ordering ON events (topological_ordering);
+CREATE INDEX events_room_id ON events (room_id);
 
 
 CREATE TABLE IF NOT EXISTS event_json(
     event_id TEXT NOT NULL,
     room_id TEXT NOT NULL,
-    internal_metadata NOT NULL,
-    json BLOB NOT NULL,
-    CONSTRAINT ev_j_uniq UNIQUE (event_id)
+    internal_metadata TEXT NOT NULL,
+    json TEXT NOT NULL,
+    UNIQUE (event_id)
 );
 
-CREATE INDEX IF NOT EXISTS event_json_id ON event_json(event_id);
-CREATE INDEX IF NOT EXISTS event_json_room_id ON event_json(room_id);
+CREATE INDEX event_json_room_id ON event_json(room_id);
 
 
 CREATE TABLE IF NOT EXISTS state_events(
@@ -50,13 +48,13 @@ CREATE TABLE IF NOT EXISTS state_events(
     room_id TEXT NOT NULL,
     type TEXT NOT NULL,
     state_key TEXT NOT NULL,
-    prev_state TEXT
+    prev_state TEXT,
+    UNIQUE (event_id)
 );
 
-CREATE UNIQUE INDEX IF NOT EXISTS state_events_event_id ON state_events (event_id);
-CREATE INDEX IF NOT EXISTS state_events_room_id ON state_events (room_id);
-CREATE INDEX IF NOT EXISTS state_events_type ON state_events (type);
-CREATE INDEX IF NOT EXISTS state_events_state_key ON state_events (state_key);
+CREATE INDEX state_events_room_id ON state_events (room_id);
+CREATE INDEX state_events_type ON state_events (type);
+CREATE INDEX state_events_state_key ON state_events (state_key);
 
 
 CREATE TABLE IF NOT EXISTS current_state_events(
@@ -64,13 +62,13 @@ CREATE TABLE IF NOT EXISTS current_state_events(
     room_id TEXT NOT NULL,
     type TEXT NOT NULL,
     state_key TEXT NOT NULL,
-    CONSTRAINT curr_uniq UNIQUE (room_id, type, state_key) ON CONFLICT REPLACE
+    UNIQUE (room_id, type, state_key)
 );
 
-CREATE INDEX IF NOT EXISTS curr_events_event_id ON current_state_events (event_id);
-CREATE INDEX IF NOT EXISTS current_state_events_room_id ON current_state_events (room_id);
-CREATE INDEX IF NOT EXISTS current_state_events_type ON current_state_events (type);
-CREATE INDEX IF NOT EXISTS current_state_events_state_key ON current_state_events (state_key);
+CREATE INDEX curr_events_event_id ON current_state_events (event_id);
+CREATE INDEX current_state_events_room_id ON current_state_events (room_id);
+CREATE INDEX current_state_events_type ON current_state_events (type);
+CREATE INDEX current_state_events_state_key ON current_state_events (state_key);
 
 CREATE TABLE IF NOT EXISTS room_memberships(
     event_id TEXT NOT NULL,
@@ -80,9 +78,9 @@ CREATE TABLE IF NOT EXISTS room_memberships(
     membership TEXT NOT NULL
 );
 
-CREATE INDEX IF NOT EXISTS room_memberships_event_id ON room_memberships (event_id);
-CREATE INDEX IF NOT EXISTS room_memberships_room_id ON room_memberships (room_id);
-CREATE INDEX IF NOT EXISTS room_memberships_user_id ON room_memberships (user_id);
+CREATE INDEX room_memberships_event_id ON room_memberships (event_id);
+CREATE INDEX room_memberships_room_id ON room_memberships (room_id);
+CREATE INDEX room_memberships_user_id ON room_memberships (user_id);
 
 CREATE TABLE IF NOT EXISTS feedback(
     event_id TEXT NOT NULL,
@@ -98,8 +96,8 @@ CREATE TABLE IF NOT EXISTS topics(
     topic TEXT NOT NULL
 );
 
-CREATE INDEX IF NOT EXISTS topics_event_id ON topics(event_id);
-CREATE INDEX IF NOT EXISTS topics_room_id ON topics(room_id);
+CREATE INDEX topics_event_id ON topics(event_id);
+CREATE INDEX topics_room_id ON topics(room_id);
 
 CREATE TABLE IF NOT EXISTS room_names(
     event_id TEXT NOT NULL,
@@ -107,19 +105,19 @@ CREATE TABLE IF NOT EXISTS room_names(
     name TEXT NOT NULL
 );
 
-CREATE INDEX IF NOT EXISTS room_names_event_id ON room_names(event_id);
-CREATE INDEX IF NOT EXISTS room_names_room_id ON room_names(room_id);
+CREATE INDEX room_names_event_id ON room_names(event_id);
+CREATE INDEX room_names_room_id ON room_names(room_id);
 
 CREATE TABLE IF NOT EXISTS rooms(
     room_id TEXT PRIMARY KEY NOT NULL,
-    is_public INTEGER,
+    is_public BOOL,
     creator TEXT
 );
 
 CREATE TABLE IF NOT EXISTS room_hosts(
     room_id TEXT NOT NULL,
     host TEXT NOT NULL,
-    CONSTRAINT room_hosts_uniq UNIQUE (room_id, host) ON CONFLICT IGNORE
+    UNIQUE (room_id, host)
 );
 
-CREATE INDEX IF NOT EXISTS room_hosts_room_id ON room_hosts (room_id);
+CREATE INDEX room_hosts_room_id ON room_hosts (room_id);
diff --git a/synapse/storage/schema/full_schemas/11/keys.sql b/synapse/storage/schema/full_schemas/11/keys.sql
index a9e0a4fe0d..afc142045e 100644
--- a/synapse/storage/schema/full_schemas/11/keys.sql
+++ b/synapse/storage/schema/full_schemas/11/keys.sql
@@ -16,16 +16,16 @@ CREATE TABLE IF NOT EXISTS server_tls_certificates(
   server_name TEXT, -- Server name.
   fingerprint TEXT, -- Certificate fingerprint.
   from_server TEXT, -- Which key server the certificate was fetched from.
-  ts_added_ms INTEGER, -- When the certifcate was added.
-  tls_certificate BLOB, -- DER encoded x509 certificate.
-  CONSTRAINT uniqueness UNIQUE (server_name, fingerprint)
+  ts_added_ms BIGINT, -- When the certifcate was added.
+  tls_certificate bytea, -- DER encoded x509 certificate.
+  UNIQUE (server_name, fingerprint)
 );
 
 CREATE TABLE IF NOT EXISTS server_signature_keys(
   server_name TEXT, -- Server name.
   key_id TEXT, -- Key version.
   from_server TEXT, -- Which key server the key was fetched form.
-  ts_added_ms INTEGER, -- When the key was added.
-  verify_key BLOB, -- NACL verification key.
-  CONSTRAINT uniqueness UNIQUE (server_name, key_id)
+  ts_added_ms BIGINT, -- When the key was added.
+  verify_key bytea, -- NACL verification key.
+  UNIQUE (server_name, key_id)
 );
diff --git a/synapse/storage/schema/full_schemas/11/media_repository.sql b/synapse/storage/schema/full_schemas/11/media_repository.sql
index afdf48cbfb..e927e581d1 100644
--- a/synapse/storage/schema/full_schemas/11/media_repository.sql
+++ b/synapse/storage/schema/full_schemas/11/media_repository.sql
@@ -17,10 +17,10 @@ CREATE TABLE IF NOT EXISTS local_media_repository (
     media_id TEXT, -- The id used to refer to the media.
     media_type TEXT, -- The MIME-type of the media.
     media_length INTEGER, -- Length of the media in bytes.
-    created_ts INTEGER, -- When the content was uploaded in ms.
+    created_ts BIGINT, -- When the content was uploaded in ms.
     upload_name TEXT, -- The name the media was uploaded with.
     user_id TEXT, -- The user who uploaded the file.
-    CONSTRAINT uniqueness UNIQUE (media_id)
+    UNIQUE (media_id)
 );
 
 CREATE TABLE IF NOT EXISTS local_media_repository_thumbnails (
@@ -30,23 +30,23 @@ CREATE TABLE IF NOT EXISTS local_media_repository_thumbnails (
     thumbnail_type TEXT, -- The MIME-type of the thumbnail.
     thumbnail_method TEXT, -- The method used to make the thumbnail.
     thumbnail_length INTEGER, -- The length of the thumbnail in bytes.
-    CONSTRAINT uniqueness UNIQUE (
+    UNIQUE (
         media_id, thumbnail_width, thumbnail_height, thumbnail_type
     )
 );
 
-CREATE INDEX IF NOT EXISTS local_media_repository_thumbnails_media_id
+CREATE INDEX local_media_repository_thumbnails_media_id
     ON local_media_repository_thumbnails (media_id);
 
 CREATE TABLE IF NOT EXISTS remote_media_cache (
     media_origin TEXT, -- The remote HS the media came from.
     media_id TEXT, -- The id used to refer to the media on that server.
     media_type TEXT, -- The MIME-type of the media.
-    created_ts INTEGER, -- When the content was uploaded in ms.
+    created_ts BIGINT, -- When the content was uploaded in ms.
     upload_name TEXT, -- The name the media was uploaded with.
     media_length INTEGER, -- Length of the media in bytes.
     filesystem_id TEXT, -- The name used to store the media on disk.
-    CONSTRAINT uniqueness UNIQUE (media_origin, media_id)
+    UNIQUE (media_origin, media_id)
 );
 
 CREATE TABLE IF NOT EXISTS remote_media_cache_thumbnails (
@@ -58,11 +58,8 @@ CREATE TABLE IF NOT EXISTS remote_media_cache_thumbnails (
     thumbnail_type TEXT, -- The MIME-type of the thumbnail.
     thumbnail_length INTEGER, -- The length of the thumbnail in bytes.
     filesystem_id TEXT, -- The name used to store the media on disk.
-    CONSTRAINT uniqueness UNIQUE (
+    UNIQUE (
         media_origin, media_id, thumbnail_width, thumbnail_height,
-        thumbnail_type, thumbnail_type
-    )
+        thumbnail_type
+     )
 );
-
-CREATE INDEX IF NOT EXISTS remote_media_cache_thumbnails_media_id
-    ON local_media_repository_thumbnails (media_id);
diff --git a/synapse/storage/schema/full_schemas/11/presence.sql b/synapse/storage/schema/full_schemas/11/presence.sql
index f9f8db9697..d8d82e9fe3 100644
--- a/synapse/storage/schema/full_schemas/11/presence.sql
+++ b/synapse/storage/schema/full_schemas/11/presence.sql
@@ -13,26 +13,23 @@
  * limitations under the License.
  */
 CREATE TABLE IF NOT EXISTS presence(
-  user_id INTEGER NOT NULL,
-  state INTEGER,
+  user_id TEXT NOT NULL,
+  state VARCHAR(20),
   status_msg TEXT,
-  mtime INTEGER, -- miliseconds since last state change
-  FOREIGN KEY(user_id) REFERENCES users(id)
+  mtime BIGINT -- miliseconds since last state change
 );
 
 -- For each of /my/ users which possibly-remote users are allowed to see their
 -- presence state
 CREATE TABLE IF NOT EXISTS presence_allow_inbound(
-  observed_user_id INTEGER NOT NULL,
-  observer_user_id TEXT, -- a UserID,
-  FOREIGN KEY(observed_user_id) REFERENCES users(id)
+  observed_user_id TEXT NOT NULL,
+  observer_user_id TEXT NOT NULL -- a UserID,
 );
 
 -- For each of /my/ users (watcher), which possibly-remote users are they
 -- watching?
 CREATE TABLE IF NOT EXISTS presence_list(
-  user_id INTEGER NOT NULL,
-  observed_user_id TEXT, -- a UserID,
-  accepted BOOLEAN,
-  FOREIGN KEY(user_id) REFERENCES users(id)
+  user_id TEXT NOT NULL,
+  observed_user_id TEXT NOT NULL, -- a UserID,
+  accepted BOOLEAN NOT NULL
 );
diff --git a/synapse/storage/schema/full_schemas/11/profiles.sql b/synapse/storage/schema/full_schemas/11/profiles.sql
index f06a528b4d..26e4204437 100644
--- a/synapse/storage/schema/full_schemas/11/profiles.sql
+++ b/synapse/storage/schema/full_schemas/11/profiles.sql
@@ -13,8 +13,7 @@
  * limitations under the License.
  */
 CREATE TABLE IF NOT EXISTS profiles(
-    user_id INTEGER NOT NULL,
+    user_id TEXT NOT NULL,
     displayname TEXT,
-    avatar_url TEXT,
-    FOREIGN KEY(user_id) REFERENCES users(id)
+    avatar_url TEXT
 );
diff --git a/synapse/storage/schema/full_schemas/11/redactions.sql b/synapse/storage/schema/full_schemas/11/redactions.sql
index 5011d95db8..69621955d4 100644
--- a/synapse/storage/schema/full_schemas/11/redactions.sql
+++ b/synapse/storage/schema/full_schemas/11/redactions.sql
@@ -15,8 +15,8 @@
 CREATE TABLE IF NOT EXISTS redactions (
     event_id TEXT NOT NULL,
     redacts TEXT NOT NULL,
-    CONSTRAINT ev_uniq UNIQUE (event_id)
+    UNIQUE (event_id)
 );
 
-CREATE INDEX IF NOT EXISTS redactions_event_id ON redactions (event_id);
-CREATE INDEX IF NOT EXISTS redactions_redacts ON redactions (redacts);
+CREATE INDEX redactions_event_id ON redactions (event_id);
+CREATE INDEX redactions_redacts ON redactions (redacts);
diff --git a/synapse/storage/schema/full_schemas/11/room_aliases.sql b/synapse/storage/schema/full_schemas/11/room_aliases.sql
index 0d2df01603..5027b1e3f6 100644
--- a/synapse/storage/schema/full_schemas/11/room_aliases.sql
+++ b/synapse/storage/schema/full_schemas/11/room_aliases.sql
@@ -22,6 +22,3 @@ CREATE TABLE IF NOT EXISTS room_alias_servers(
     room_alias TEXT NOT NULL,
     server TEXT NOT NULL
 );
-
-
-
diff --git a/synapse/storage/schema/full_schemas/11/state.sql b/synapse/storage/schema/full_schemas/11/state.sql
index 1fe8f1e430..ffd164ab71 100644
--- a/synapse/storage/schema/full_schemas/11/state.sql
+++ b/synapse/storage/schema/full_schemas/11/state.sql
@@ -30,18 +30,11 @@ CREATE TABLE IF NOT EXISTS state_groups_state(
 CREATE TABLE IF NOT EXISTS event_to_state_groups(
     event_id TEXT NOT NULL,
     state_group INTEGER NOT NULL,
-    CONSTRAINT event_to_state_groups_uniq UNIQUE (event_id)
+    UNIQUE (event_id)
 );
 
-CREATE INDEX IF NOT EXISTS state_groups_id ON state_groups(id);
+CREATE INDEX state_groups_id ON state_groups(id);
 
-CREATE INDEX IF NOT EXISTS state_groups_state_id ON state_groups_state(
-    state_group
-);
-CREATE INDEX IF NOT EXISTS state_groups_state_tuple ON state_groups_state(
-    room_id, type, state_key
-);
-
-CREATE INDEX IF NOT EXISTS event_to_state_groups_id ON event_to_state_groups(
-    event_id
-);
\ No newline at end of file
+CREATE INDEX state_groups_state_id ON state_groups_state(state_group);
+CREATE INDEX state_groups_state_tuple ON state_groups_state(room_id, type, state_key);
+CREATE INDEX event_to_state_groups_id ON event_to_state_groups(event_id);
diff --git a/synapse/storage/schema/full_schemas/11/transactions.sql b/synapse/storage/schema/full_schemas/11/transactions.sql
index 2d30f99b06..cc5b54f5aa 100644
--- a/synapse/storage/schema/full_schemas/11/transactions.sql
+++ b/synapse/storage/schema/full_schemas/11/transactions.sql
@@ -14,17 +14,16 @@
  */
 -- Stores what transaction ids we have received and what our response was
 CREATE TABLE IF NOT EXISTS received_transactions(
-    transaction_id TEXT, 
-    origin TEXT, 
-    ts INTEGER,
+    transaction_id TEXT,
+    origin TEXT,
+    ts BIGINT,
     response_code INTEGER,
-    response_json TEXT,
-    has_been_referenced BOOL default 0, -- Whether thishas been referenced by a prev_tx
-    CONSTRAINT uniquesss UNIQUE (transaction_id, origin) ON CONFLICT REPLACE
+    response_json bytea,
+    has_been_referenced SMALLINT DEFAULT 0, -- Whether thishas been referenced by a prev_tx
+    UNIQUE (transaction_id, origin)
 );
 
-CREATE UNIQUE INDEX IF NOT EXISTS transactions_txid ON received_transactions(transaction_id, origin);
-CREATE INDEX IF NOT EXISTS transactions_have_ref ON received_transactions(origin, has_been_referenced);-- WHERE has_been_referenced = 0;
+CREATE INDEX transactions_have_ref ON received_transactions(origin, has_been_referenced);-- WHERE has_been_referenced = 0;
 
 
 -- Stores what transactions we've sent, what their response was (if we got one) and whether we have
@@ -35,17 +34,14 @@ CREATE TABLE IF NOT EXISTS sent_transactions(
     destination TEXT,
     response_code INTEGER DEFAULT 0,
     response_json TEXT,
-    ts INTEGER
+    ts BIGINT
 );
 
-CREATE INDEX IF NOT EXISTS sent_transaction_dest ON sent_transactions(destination);
-CREATE INDEX IF NOT EXISTS sent_transaction_dest_referenced ON sent_transactions(
-    destination
-);
-CREATE INDEX IF NOT EXISTS sent_transaction_txn_id ON sent_transactions(transaction_id);
+CREATE INDEX sent_transaction_dest ON sent_transactions(destination);
+CREATE INDEX sent_transaction_txn_id ON sent_transactions(transaction_id);
 -- So that we can do an efficient look up of all transactions that have yet to be successfully
 -- sent.
-CREATE INDEX IF NOT EXISTS sent_transaction_sent ON sent_transactions(response_code);
+CREATE INDEX sent_transaction_sent ON sent_transactions(response_code);
 
 
 -- For sent transactions only.
@@ -56,13 +52,12 @@ CREATE TABLE IF NOT EXISTS transaction_id_to_pdu(
     pdu_origin TEXT
 );
 
-CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_tx ON transaction_id_to_pdu(transaction_id, destination);
-CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_dest ON transaction_id_to_pdu(destination);
-CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_index ON transaction_id_to_pdu(transaction_id, destination);
+CREATE INDEX transaction_id_to_pdu_tx ON transaction_id_to_pdu(transaction_id, destination);
+CREATE INDEX transaction_id_to_pdu_dest ON transaction_id_to_pdu(destination);
 
 -- To track destination health
 CREATE TABLE IF NOT EXISTS destinations(
     destination TEXT PRIMARY KEY,
-    retry_last_ts INTEGER,
+    retry_last_ts BIGINT,
     retry_interval INTEGER
 );
diff --git a/synapse/storage/schema/full_schemas/11/users.sql b/synapse/storage/schema/full_schemas/11/users.sql
index 08ccfdac0a..eec3da3c35 100644
--- a/synapse/storage/schema/full_schemas/11/users.sql
+++ b/synapse/storage/schema/full_schemas/11/users.sql
@@ -16,19 +16,18 @@ CREATE TABLE IF NOT EXISTS users(
     id INTEGER PRIMARY KEY AUTOINCREMENT,
     name TEXT,
     password_hash TEXT,
-    creation_ts INTEGER,
-    admin BOOL DEFAULT 0 NOT NULL,
-    UNIQUE(name) ON CONFLICT ROLLBACK
+    creation_ts BIGINT,
+    admin SMALLINT DEFAULT 0 NOT NULL,
+    UNIQUE(name)
 );
 
 CREATE TABLE IF NOT EXISTS access_tokens(
     id INTEGER PRIMARY KEY AUTOINCREMENT,
-    user_id INTEGER NOT NULL,
+    user_id TEXT NOT NULL,
     device_id TEXT,
     token TEXT NOT NULL,
-    last_used INTEGER,
-    FOREIGN KEY(user_id) REFERENCES users(id),
-    UNIQUE(token) ON CONFLICT ROLLBACK
+    last_used BIGINT,
+    UNIQUE(token)
 );
 
 CREATE TABLE IF NOT EXISTS user_ips (
@@ -37,9 +36,8 @@ CREATE TABLE IF NOT EXISTS user_ips (
     device_id TEXT,
     ip TEXT NOT NULL,
     user_agent TEXT NOT NULL,
-    last_seen INTEGER NOT NULL,
-    CONSTRAINT user_ip UNIQUE (user, access_token, ip, user_agent) ON CONFLICT REPLACE
+    last_seen BIGINT NOT NULL,
+    UNIQUE (user, access_token, ip, user_agent)
 );
 
-CREATE INDEX IF NOT EXISTS user_ips_user ON user_ips(user);
-
+CREATE INDEX user_ips_user ON user_ips(user);
diff --git a/synapse/storage/schema/full_schemas/16/application_services.sql b/synapse/storage/schema/full_schemas/16/application_services.sql
new file mode 100644
index 0000000000..d382d63fbd
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/application_services.sql
@@ -0,0 +1,48 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS application_services(
+    id BIGINT PRIMARY KEY,
+    url TEXT,
+    token TEXT,
+    hs_token TEXT,
+    sender TEXT,
+    UNIQUE(token)
+);
+
+CREATE TABLE IF NOT EXISTS application_services_regex(
+    id BIGINT PRIMARY KEY,
+    as_id BIGINT NOT NULL,
+    namespace INTEGER,  /* enum[room_id|room_alias|user_id] */
+    regex TEXT,
+    FOREIGN KEY(as_id) REFERENCES application_services(id)
+);
+
+CREATE TABLE IF NOT EXISTS application_services_state(
+    as_id TEXT PRIMARY KEY,
+    state VARCHAR(5),
+    last_txn INTEGER
+);
+
+CREATE TABLE IF NOT EXISTS application_services_txns(
+    as_id TEXT NOT NULL,
+    txn_id INTEGER NOT NULL,
+    event_ids TEXT NOT NULL,
+    UNIQUE(as_id, txn_id)
+);
+
+CREATE INDEX application_services_txns_id ON application_services_txns (
+    as_id
+);
diff --git a/synapse/storage/schema/full_schemas/16/event_edges.sql b/synapse/storage/schema/full_schemas/16/event_edges.sql
new file mode 100644
index 0000000000..f7020f7793
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/event_edges.sql
@@ -0,0 +1,89 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS event_forward_extremities(
+    event_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    UNIQUE (event_id, room_id)
+);
+
+CREATE INDEX ev_extrem_room ON event_forward_extremities(room_id);
+CREATE INDEX ev_extrem_id ON event_forward_extremities(event_id);
+
+
+CREATE TABLE IF NOT EXISTS event_backward_extremities(
+    event_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    UNIQUE (event_id, room_id)
+);
+
+CREATE INDEX ev_b_extrem_room ON event_backward_extremities(room_id);
+CREATE INDEX ev_b_extrem_id ON event_backward_extremities(event_id);
+
+
+CREATE TABLE IF NOT EXISTS event_edges(
+    event_id TEXT NOT NULL,
+    prev_event_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    is_state BOOL NOT NULL,
+    UNIQUE (event_id, prev_event_id, room_id, is_state)
+);
+
+CREATE INDEX ev_edges_id ON event_edges(event_id);
+CREATE INDEX ev_edges_prev_id ON event_edges(prev_event_id);
+
+
+CREATE TABLE IF NOT EXISTS room_depth(
+    room_id TEXT NOT NULL,
+    min_depth INTEGER NOT NULL,
+    UNIQUE (room_id)
+);
+
+CREATE INDEX room_depth_room ON room_depth(room_id);
+
+
+create TABLE IF NOT EXISTS event_destinations(
+    event_id TEXT NOT NULL,
+    destination TEXT NOT NULL,
+    delivered_ts BIGINT DEFAULT 0, -- or 0 if not delivered
+    UNIQUE (event_id, destination)
+);
+
+CREATE INDEX event_destinations_id ON event_destinations(event_id);
+
+
+CREATE TABLE IF NOT EXISTS state_forward_extremities(
+    event_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    type TEXT NOT NULL,
+    state_key TEXT NOT NULL,
+    UNIQUE (event_id, room_id)
+);
+
+CREATE INDEX st_extrem_keys ON state_forward_extremities(
+    room_id, type, state_key
+);
+CREATE INDEX st_extrem_id ON state_forward_extremities(event_id);
+
+
+CREATE TABLE IF NOT EXISTS event_auth(
+    event_id TEXT NOT NULL,
+    auth_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    UNIQUE (event_id, auth_id, room_id)
+);
+
+CREATE INDEX evauth_edges_id ON event_auth(event_id);
+CREATE INDEX evauth_edges_auth_id ON event_auth(auth_id);
diff --git a/synapse/storage/schema/full_schemas/16/event_signatures.sql b/synapse/storage/schema/full_schemas/16/event_signatures.sql
new file mode 100644
index 0000000000..636b2d3353
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/event_signatures.sql
@@ -0,0 +1,55 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS event_content_hashes (
+    event_id TEXT,
+    algorithm TEXT,
+    hash bytea,
+    UNIQUE (event_id, algorithm)
+);
+
+CREATE INDEX event_content_hashes_id ON event_content_hashes(event_id);
+
+
+CREATE TABLE IF NOT EXISTS event_reference_hashes (
+    event_id TEXT,
+    algorithm TEXT,
+    hash bytea,
+    UNIQUE (event_id, algorithm)
+);
+
+CREATE INDEX event_reference_hashes_id ON event_reference_hashes(event_id);
+
+
+CREATE TABLE IF NOT EXISTS event_signatures (
+    event_id TEXT,
+    signature_name TEXT,
+    key_id TEXT,
+    signature bytea,
+    UNIQUE (event_id, signature_name, key_id)
+);
+
+CREATE INDEX event_signatures_id ON event_signatures(event_id);
+
+
+CREATE TABLE IF NOT EXISTS event_edge_hashes(
+    event_id TEXT,
+    prev_event_id TEXT,
+    algorithm TEXT,
+    hash bytea,
+    UNIQUE (event_id, prev_event_id, algorithm)
+);
+
+CREATE INDEX event_edge_hashes_id ON event_edge_hashes(event_id);
diff --git a/synapse/storage/schema/full_schemas/16/im.sql b/synapse/storage/schema/full_schemas/16/im.sql
new file mode 100644
index 0000000000..576653a3c9
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/im.sql
@@ -0,0 +1,128 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS events(
+    stream_ordering INTEGER PRIMARY KEY,
+    topological_ordering BIGINT NOT NULL,
+    event_id TEXT NOT NULL,
+    type TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    content TEXT NOT NULL,
+    unrecognized_keys TEXT,
+    processed BOOL NOT NULL,
+    outlier BOOL NOT NULL,
+    depth BIGINT DEFAULT 0 NOT NULL,
+    UNIQUE (event_id)
+);
+
+CREATE INDEX events_stream_ordering ON events (stream_ordering);
+CREATE INDEX events_topological_ordering ON events (topological_ordering);
+CREATE INDEX events_order ON events (topological_ordering, stream_ordering);
+CREATE INDEX events_room_id ON events (room_id);
+CREATE INDEX events_order_room ON events (
+    room_id, topological_ordering, stream_ordering
+);
+
+
+CREATE TABLE IF NOT EXISTS event_json(
+    event_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    internal_metadata TEXT NOT NULL,
+    json TEXT NOT NULL,
+    UNIQUE (event_id)
+);
+
+CREATE INDEX event_json_room_id ON event_json(room_id);
+
+
+CREATE TABLE IF NOT EXISTS state_events(
+    event_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    type TEXT NOT NULL,
+    state_key TEXT NOT NULL,
+    prev_state TEXT,
+    UNIQUE (event_id)
+);
+
+CREATE INDEX state_events_room_id ON state_events (room_id);
+CREATE INDEX state_events_type ON state_events (type);
+CREATE INDEX state_events_state_key ON state_events (state_key);
+
+
+CREATE TABLE IF NOT EXISTS current_state_events(
+    event_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    type TEXT NOT NULL,
+    state_key TEXT NOT NULL,
+    UNIQUE (event_id),
+    UNIQUE (room_id, type, state_key)
+);
+
+CREATE INDEX current_state_events_room_id ON current_state_events (room_id);
+CREATE INDEX current_state_events_type ON current_state_events (type);
+CREATE INDEX current_state_events_state_key ON current_state_events (state_key);
+
+CREATE TABLE IF NOT EXISTS room_memberships(
+    event_id TEXT NOT NULL,
+    user_id TEXT NOT NULL,
+    sender TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    membership TEXT NOT NULL,
+    UNIQUE (event_id)
+);
+
+CREATE INDEX room_memberships_room_id ON room_memberships (room_id);
+CREATE INDEX room_memberships_user_id ON room_memberships (user_id);
+
+CREATE TABLE IF NOT EXISTS feedback(
+    event_id TEXT NOT NULL,
+    feedback_type TEXT,
+    target_event_id TEXT,
+    sender TEXT,
+    room_id TEXT,
+    UNIQUE (event_id)
+);
+
+CREATE TABLE IF NOT EXISTS topics(
+    event_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    topic TEXT NOT NULL,
+    UNIQUE (event_id)
+);
+
+CREATE INDEX topics_room_id ON topics(room_id);
+
+CREATE TABLE IF NOT EXISTS room_names(
+    event_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    name TEXT NOT NULL,
+    UNIQUE (event_id)
+);
+
+CREATE INDEX room_names_room_id ON room_names(room_id);
+
+CREATE TABLE IF NOT EXISTS rooms(
+    room_id TEXT PRIMARY KEY NOT NULL,
+    is_public BOOL,
+    creator TEXT
+);
+
+CREATE TABLE IF NOT EXISTS room_hosts(
+    room_id TEXT NOT NULL,
+    host TEXT NOT NULL,
+    UNIQUE (room_id, host)
+);
+
+CREATE INDEX room_hosts_room_id ON room_hosts (room_id);
diff --git a/synapse/storage/schema/full_schemas/16/keys.sql b/synapse/storage/schema/full_schemas/16/keys.sql
new file mode 100644
index 0000000000..afc142045e
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/keys.sql
@@ -0,0 +1,31 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+CREATE TABLE IF NOT EXISTS server_tls_certificates(
+  server_name TEXT, -- Server name.
+  fingerprint TEXT, -- Certificate fingerprint.
+  from_server TEXT, -- Which key server the certificate was fetched from.
+  ts_added_ms BIGINT, -- When the certifcate was added.
+  tls_certificate bytea, -- DER encoded x509 certificate.
+  UNIQUE (server_name, fingerprint)
+);
+
+CREATE TABLE IF NOT EXISTS server_signature_keys(
+  server_name TEXT, -- Server name.
+  key_id TEXT, -- Key version.
+  from_server TEXT, -- Which key server the key was fetched form.
+  ts_added_ms BIGINT, -- When the key was added.
+  verify_key bytea, -- NACL verification key.
+  UNIQUE (server_name, key_id)
+);
diff --git a/synapse/storage/schema/full_schemas/16/media_repository.sql b/synapse/storage/schema/full_schemas/16/media_repository.sql
new file mode 100644
index 0000000000..dacbda40ca
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/media_repository.sql
@@ -0,0 +1,68 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS local_media_repository (
+    media_id TEXT, -- The id used to refer to the media.
+    media_type TEXT, -- The MIME-type of the media.
+    media_length INTEGER, -- Length of the media in bytes.
+    created_ts BIGINT, -- When the content was uploaded in ms.
+    upload_name TEXT, -- The name the media was uploaded with.
+    user_id TEXT, -- The user who uploaded the file.
+    UNIQUE (media_id)
+);
+
+CREATE TABLE IF NOT EXISTS local_media_repository_thumbnails (
+    media_id TEXT, -- The id used to refer to the media.
+    thumbnail_width INTEGER, -- The width of the thumbnail in pixels.
+    thumbnail_height INTEGER, -- The height of the thumbnail in pixels.
+    thumbnail_type TEXT, -- The MIME-type of the thumbnail.
+    thumbnail_method TEXT, -- The method used to make the thumbnail.
+    thumbnail_length INTEGER, -- The length of the thumbnail in bytes.
+    UNIQUE (
+        media_id, thumbnail_width, thumbnail_height, thumbnail_type
+    )
+);
+
+CREATE INDEX local_media_repository_thumbnails_media_id
+    ON local_media_repository_thumbnails (media_id);
+
+CREATE TABLE IF NOT EXISTS remote_media_cache (
+    media_origin TEXT, -- The remote HS the media came from.
+    media_id TEXT, -- The id used to refer to the media on that server.
+    media_type TEXT, -- The MIME-type of the media.
+    created_ts BIGINT, -- When the content was uploaded in ms.
+    upload_name TEXT, -- The name the media was uploaded with.
+    media_length INTEGER, -- Length of the media in bytes.
+    filesystem_id TEXT, -- The name used to store the media on disk.
+    UNIQUE (media_origin, media_id)
+);
+
+CREATE TABLE IF NOT EXISTS remote_media_cache_thumbnails (
+    media_origin TEXT, -- The remote HS the media came from.
+    media_id TEXT, -- The id used to refer to the media.
+    thumbnail_width INTEGER, -- The width of the thumbnail in pixels.
+    thumbnail_height INTEGER, -- The height of the thumbnail in pixels.
+    thumbnail_method TEXT, -- The method used to make the thumbnail
+    thumbnail_type TEXT, -- The MIME-type of the thumbnail.
+    thumbnail_length INTEGER, -- The length of the thumbnail in bytes.
+    filesystem_id TEXT, -- The name used to store the media on disk.
+    UNIQUE (
+        media_origin, media_id, thumbnail_width, thumbnail_height,
+        thumbnail_type
+     )
+);
+
+CREATE INDEX remote_media_cache_thumbnails_media_id
+    ON remote_media_cache_thumbnails (media_id);
diff --git a/synapse/storage/schema/full_schemas/16/presence.sql b/synapse/storage/schema/full_schemas/16/presence.sql
new file mode 100644
index 0000000000..80088413ba
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/presence.sql
@@ -0,0 +1,40 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+CREATE TABLE IF NOT EXISTS presence(
+  user_id TEXT NOT NULL,
+  state VARCHAR(20),
+  status_msg TEXT,
+  mtime BIGINT, -- miliseconds since last state change
+  UNIQUE (user_id)
+);
+
+-- For each of /my/ users which possibly-remote users are allowed to see their
+-- presence state
+CREATE TABLE IF NOT EXISTS presence_allow_inbound(
+  observed_user_id TEXT NOT NULL,
+  observer_user_id TEXT NOT NULL, -- a UserID,
+  UNIQUE (observed_user_id, observer_user_id)
+);
+
+-- For each of /my/ users (watcher), which possibly-remote users are they
+-- watching?
+CREATE TABLE IF NOT EXISTS presence_list(
+  user_id TEXT NOT NULL,
+  observed_user_id TEXT NOT NULL, -- a UserID,
+  accepted BOOLEAN NOT NULL,
+  UNIQUE (user_id, observed_user_id)
+);
+
+CREATE INDEX presence_list_user_id ON presence_list (user_id);
diff --git a/synapse/storage/schema/full_schemas/16/profiles.sql b/synapse/storage/schema/full_schemas/16/profiles.sql
new file mode 100644
index 0000000000..934be86520
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/profiles.sql
@@ -0,0 +1,20 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+CREATE TABLE IF NOT EXISTS profiles(
+    user_id TEXT NOT NULL,
+    displayname TEXT,
+    avatar_url TEXT,
+    UNIQUE(user_id)
+);
diff --git a/synapse/storage/schema/full_schemas/16/push.sql b/synapse/storage/schema/full_schemas/16/push.sql
new file mode 100644
index 0000000000..db6e05cbdf
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/push.sql
@@ -0,0 +1,73 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS rejections(
+    event_id TEXT NOT NULL,
+    reason TEXT NOT NULL,
+    last_check TEXT NOT NULL,
+    UNIQUE (event_id)
+);
+
+-- Push notification endpoints that users have configured
+CREATE TABLE IF NOT EXISTS pushers (
+  id BIGINT PRIMARY KEY,
+  user_name TEXT NOT NULL,
+  profile_tag VARCHAR(32) NOT NULL,
+  kind VARCHAR(8) NOT NULL,
+  app_id VARCHAR(64) NOT NULL,
+  app_display_name VARCHAR(64) NOT NULL,
+  device_display_name VARCHAR(128) NOT NULL,
+  pushkey bytea NOT NULL,
+  ts BIGINT NOT NULL,
+  lang VARCHAR(8),
+  data bytea,
+  last_token TEXT,
+  last_success BIGINT,
+  failing_since BIGINT,
+  UNIQUE (app_id, pushkey)
+);
+
+CREATE TABLE IF NOT EXISTS push_rules (
+  id BIGINT PRIMARY KEY,
+  user_name TEXT NOT NULL,
+  rule_id TEXT NOT NULL,
+  priority_class SMALLINT NOT NULL,
+  priority INTEGER NOT NULL DEFAULT 0,
+  conditions TEXT NOT NULL,
+  actions TEXT NOT NULL,
+  UNIQUE(user_name, rule_id)
+);
+
+CREATE INDEX push_rules_user_name on push_rules (user_name);
+
+CREATE TABLE IF NOT EXISTS user_filters(
+  user_id TEXT,
+  filter_id BIGINT,
+  filter_json bytea
+);
+
+CREATE INDEX user_filters_by_user_id_filter_id ON user_filters(
+    user_id, filter_id
+);
+
+CREATE TABLE IF NOT EXISTS push_rules_enable (
+  id BIGINT PRIMARY KEY,
+  user_name TEXT NOT NULL,
+  rule_id TEXT NOT NULL,
+  enabled SMALLINT,
+  UNIQUE(user_name, rule_id)
+);
+
+CREATE INDEX push_rules_enable_user_name on push_rules_enable (user_name);
diff --git a/synapse/storage/schema/full_schemas/16/redactions.sql b/synapse/storage/schema/full_schemas/16/redactions.sql
new file mode 100644
index 0000000000..69621955d4
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/redactions.sql
@@ -0,0 +1,22 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+CREATE TABLE IF NOT EXISTS redactions (
+    event_id TEXT NOT NULL,
+    redacts TEXT NOT NULL,
+    UNIQUE (event_id)
+);
+
+CREATE INDEX redactions_event_id ON redactions (event_id);
+CREATE INDEX redactions_redacts ON redactions (redacts);
diff --git a/synapse/storage/schema/full_schemas/16/room_aliases.sql b/synapse/storage/schema/full_schemas/16/room_aliases.sql
new file mode 100644
index 0000000000..412bb97fad
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/room_aliases.sql
@@ -0,0 +1,29 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS room_aliases(
+    room_alias TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    UNIQUE (room_alias)
+);
+
+CREATE INDEX room_aliases_id ON room_aliases(room_id);
+
+CREATE TABLE IF NOT EXISTS room_alias_servers(
+    room_alias TEXT NOT NULL,
+    server TEXT NOT NULL
+);
+
+CREATE INDEX room_alias_servers_alias ON room_alias_servers(room_alias);
diff --git a/synapse/storage/schema/full_schemas/16/state.sql b/synapse/storage/schema/full_schemas/16/state.sql
new file mode 100644
index 0000000000..705cac6ce9
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/state.sql
@@ -0,0 +1,40 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS state_groups(
+    id BIGINT PRIMARY KEY,
+    room_id TEXT NOT NULL,
+    event_id TEXT NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS state_groups_state(
+    state_group BIGINT NOT NULL,
+    room_id TEXT NOT NULL,
+    type TEXT NOT NULL,
+    state_key TEXT NOT NULL,
+    event_id TEXT NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS event_to_state_groups(
+    event_id TEXT NOT NULL,
+    state_group BIGINT NOT NULL,
+    UNIQUE (event_id)
+);
+
+CREATE INDEX state_groups_id ON state_groups(id);
+
+CREATE INDEX state_groups_state_id ON state_groups_state(state_group);
+CREATE INDEX state_groups_state_tuple ON state_groups_state(room_id, type, state_key);
+CREATE INDEX event_to_state_groups_id ON event_to_state_groups(event_id);
diff --git a/synapse/storage/schema/full_schemas/16/transactions.sql b/synapse/storage/schema/full_schemas/16/transactions.sql
new file mode 100644
index 0000000000..1ab77cdb63
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/transactions.sql
@@ -0,0 +1,63 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-- Stores what transaction ids we have received and what our response was
+CREATE TABLE IF NOT EXISTS received_transactions(
+    transaction_id TEXT,
+    origin TEXT,
+    ts BIGINT,
+    response_code INTEGER,
+    response_json bytea,
+    has_been_referenced smallint default 0, -- Whether thishas been referenced by a prev_tx
+    UNIQUE (transaction_id, origin)
+);
+
+CREATE INDEX transactions_have_ref ON received_transactions(origin, has_been_referenced);-- WHERE has_been_referenced = 0;
+
+
+-- Stores what transactions we've sent, what their response was (if we got one) and whether we have
+-- since referenced the transaction in another outgoing transaction
+CREATE TABLE IF NOT EXISTS sent_transactions(
+    id BIGINT PRIMARY KEY, -- This is used to apply insertion ordering
+    transaction_id TEXT,
+    destination TEXT,
+    response_code INTEGER DEFAULT 0,
+    response_json TEXT,
+    ts BIGINT
+);
+
+CREATE INDEX sent_transaction_dest ON sent_transactions(destination);
+CREATE INDEX sent_transaction_txn_id ON sent_transactions(transaction_id);
+-- So that we can do an efficient look up of all transactions that have yet to be successfully
+-- sent.
+CREATE INDEX sent_transaction_sent ON sent_transactions(response_code);
+
+
+-- For sent transactions only.
+CREATE TABLE IF NOT EXISTS transaction_id_to_pdu(
+    transaction_id INTEGER,
+    destination TEXT,
+    pdu_id TEXT,
+    pdu_origin TEXT,
+    UNIQUE (transaction_id, destination)
+);
+
+CREATE INDEX transaction_id_to_pdu_dest ON transaction_id_to_pdu(destination);
+
+-- To track destination health
+CREATE TABLE IF NOT EXISTS destinations(
+    destination TEXT PRIMARY KEY,
+    retry_last_ts BIGINT,
+    retry_interval INTEGER
+);
diff --git a/synapse/storage/schema/full_schemas/16/users.sql b/synapse/storage/schema/full_schemas/16/users.sql
new file mode 100644
index 0000000000..d2fa3122da
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/users.sql
@@ -0,0 +1,42 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+CREATE TABLE IF NOT EXISTS users(
+    name TEXT,
+    password_hash TEXT,
+    creation_ts BIGINT,
+    admin SMALLINT DEFAULT 0 NOT NULL,
+    UNIQUE(name)
+);
+
+CREATE TABLE IF NOT EXISTS access_tokens(
+    id BIGINT PRIMARY KEY,
+    user_id TEXT NOT NULL,
+    device_id TEXT,
+    token TEXT NOT NULL,
+    last_used BIGINT,
+    UNIQUE(token)
+);
+
+CREATE TABLE IF NOT EXISTS user_ips (
+    user_id TEXT NOT NULL,
+    access_token TEXT NOT NULL,
+    device_id TEXT,
+    ip TEXT NOT NULL,
+    user_agent TEXT NOT NULL,
+    last_seen BIGINT NOT NULL
+);
+
+CREATE INDEX user_ips_user ON user_ips(user_id);
+CREATE INDEX user_ips_user_ip ON user_ips(user_id, access_token, ip);
diff --git a/synapse/storage/schema/schema_version.sql b/synapse/storage/schema/schema_version.sql
index 0431e2d051..d682608aa0 100644
--- a/synapse/storage/schema/schema_version.sql
+++ b/synapse/storage/schema/schema_version.sql
@@ -14,17 +14,14 @@
  */
 
 CREATE TABLE IF NOT EXISTS schema_version(
-    Lock char(1) NOT NULL DEFAULT 'X',  -- Makes sure this table only has one row.
+    Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE,  -- Makes sure this table only has one row.
     version INTEGER NOT NULL,
     upgraded BOOL NOT NULL,  -- Whether we reached this version from an upgrade or an initial schema.
-    CONSTRAINT schema_version_lock_x CHECK (Lock='X')
-    CONSTRAINT schema_version_lock_uniq UNIQUE (Lock)
+    CHECK (Lock='X')
 );
 
 CREATE TABLE IF NOT EXISTS applied_schema_deltas(
     version INTEGER NOT NULL,
     file TEXT NOT NULL,
-    CONSTRAINT schema_deltas_ver_file UNIQUE (version, file) ON CONFLICT IGNORE
+    UNIQUE(version, file)
 );
-
-CREATE INDEX IF NOT EXISTS schema_deltas_ver ON applied_schema_deltas(version);
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index d0d53770f2..f051828630 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -56,7 +56,6 @@ class SignatureStore(SQLBaseStore):
                 "algorithm": algorithm,
                 "hash": buffer(hash_bytes),
             },
-            or_ignore=True,
         )
 
     def get_event_reference_hashes(self, event_ids):
@@ -100,7 +99,7 @@ class SignatureStore(SQLBaseStore):
             " WHERE event_id = ?"
         )
         txn.execute(query, (event_id, ))
-        return dict(txn.fetchall())
+        return {k: v for k, v in txn.fetchall()}
 
     def _store_event_reference_hash_txn(self, txn, event_id, algorithm,
                                         hash_bytes):
@@ -119,7 +118,6 @@ class SignatureStore(SQLBaseStore):
                 "algorithm": algorithm,
                 "hash": buffer(hash_bytes),
             },
-            or_ignore=True,
         )
 
     def _get_event_signatures_txn(self, txn, event_id):
@@ -164,7 +162,6 @@ class SignatureStore(SQLBaseStore):
                 "key_id": key_id,
                 "signature": buffer(signature_bytes),
             },
-            or_ignore=True,
         )
 
     def _get_prev_event_hashes_txn(self, txn, event_id):
@@ -198,5 +195,4 @@ class SignatureStore(SQLBaseStore):
                 "algorithm": algorithm,
                 "hash": buffer(hash_bytes),
             },
-            or_ignore=True,
         )
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 58dbf2802b..553ba9dd1f 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -17,6 +17,8 @@ from ._base import SQLBaseStore
 
 from twisted.internet import defer
 
+from synapse.util.stringutils import random_string
+
 import logging
 
 logger = logging.getLogger(__name__)
@@ -91,14 +93,15 @@ class StateStore(SQLBaseStore):
 
         state_group = context.state_group
         if not state_group:
-            state_group = self._simple_insert_txn(
+            state_group = self._state_groups_id_gen.get_next_txn(txn)
+            self._simple_insert_txn(
                 txn,
                 table="state_groups",
                 values={
+                    "id": state_group,
                     "room_id": event.room_id,
                     "event_id": event.event_id,
                 },
-                or_ignore=True,
             )
 
             for state in state_events.values():
@@ -112,7 +115,6 @@ class StateStore(SQLBaseStore):
                         "state_key": state.state_key,
                         "event_id": state.event_id,
                     },
-                    or_ignore=True,
                 )
 
         self._simple_insert_txn(
@@ -122,7 +124,6 @@ class StateStore(SQLBaseStore):
                 "state_group": state_group,
                 "event_id": event.event_id,
             },
-            or_replace=True,
         )
 
     @defer.inlineCallbacks
@@ -154,3 +155,7 @@ class StateStore(SQLBaseStore):
 
         events = yield self._parse_events(results)
         defer.returnValue(events)
+
+
+def _make_group_id(clock):
+    return str(int(clock.time_msec())) + random_string(5)
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 66f307e640..df6de7cbcd 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -35,7 +35,7 @@ what sort order was used:
 
 from twisted.internet import defer
 
-from ._base import SQLBaseStore, cached
+from ._base import SQLBaseStore
 from synapse.api.constants import EventTypes
 from synapse.api.errors import SynapseError
 from synapse.util.logutils import log_function
@@ -110,7 +110,7 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")):
         if self.topological is None:
             return "(%d < %s)" % (self.stream, "stream_ordering")
         else:
-            return "(%d < %s OR (%d == %s AND %d < %s))" % (
+            return "(%d < %s OR (%d = %s AND %d < %s))" % (
                 self.topological, "topological_ordering",
                 self.topological, "topological_ordering",
                 self.stream, "stream_ordering",
@@ -120,7 +120,7 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")):
         if self.topological is None:
             return "(%d >= %s)" % (self.stream, "stream_ordering")
         else:
-            return "(%d > %s OR (%d == %s AND %d >= %s))" % (
+            return "(%d > %s OR (%d = %s AND %d >= %s))" % (
                 self.topological, "topological_ordering",
                 self.topological, "topological_ordering",
                 self.stream, "stream_ordering",
@@ -240,7 +240,7 @@ class StreamStore(SQLBaseStore):
 
         sql = (
             "SELECT e.event_id, e.stream_ordering FROM events AS e WHERE "
-            "(e.outlier = 0 AND (room_id IN (%(current)s)) OR "
+            "(e.outlier = ? AND (room_id IN (%(current)s)) OR "
             "(event_id IN (%(invites)s))) "
             "AND e.stream_ordering > ? AND e.stream_ordering <= ? "
             "ORDER BY stream_ordering ASC LIMIT %(limit)d "
@@ -251,7 +251,7 @@ class StreamStore(SQLBaseStore):
         }
 
         def f(txn):
-            txn.execute(sql, (user_id, user_id, from_id.stream, to_id.stream,))
+            txn.execute(sql, (False, user_id, user_id, from_id.stream, to_id.stream,))
 
             rows = self.cursor_to_dict(txn)
 
@@ -283,7 +283,7 @@ class StreamStore(SQLBaseStore):
         # Tokens really represent positions between elements, but we use
         # the convention of pointing to the event before the gap. Hence
         # we have a bit of asymmetry when it comes to equalities.
-        args = [room_id]
+        args = [False, room_id]
         if direction == 'b':
             order = "DESC"
             bounds = _StreamToken.parse(from_key).upper_bound()
@@ -307,7 +307,7 @@ class StreamStore(SQLBaseStore):
 
         sql = (
             "SELECT * FROM events"
-            " WHERE outlier = 0 AND room_id = ? AND %(bounds)s"
+            " WHERE outlier = ? AND room_id = ? AND %(bounds)s"
             " ORDER BY topological_ordering %(order)s,"
             " stream_ordering %(order)s %(limit)s"
         ) % {
@@ -358,7 +358,7 @@ class StreamStore(SQLBaseStore):
             sql = (
                 "SELECT stream_ordering, topological_ordering, event_id"
                 " FROM events"
-                " WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0"
+                " WHERE room_id = ? AND stream_ordering <= ? AND outlier = ?"
                 " ORDER BY topological_ordering DESC, stream_ordering DESC"
                 " LIMIT ?"
             )
@@ -368,17 +368,17 @@ class StreamStore(SQLBaseStore):
                 "SELECT stream_ordering, topological_ordering, event_id"
                 " FROM events"
                 " WHERE room_id = ? AND stream_ordering > ?"
-                " AND stream_ordering <= ? AND outlier = 0"
+                " AND stream_ordering <= ? AND outlier = ?"
                 " ORDER BY topological_ordering DESC, stream_ordering DESC"
                 " LIMIT ?"
             )
 
         def get_recent_events_for_room_txn(txn):
             if from_token is None:
-                txn.execute(sql, (room_id, end_token.stream, limit,))
+                txn.execute(sql, (room_id, end_token.stream, False, limit,))
             else:
                 txn.execute(sql, (
-                    room_id, from_token.stream, end_token.stream, limit
+                    room_id, from_token.stream, end_token.stream, False, limit
                 ))
 
             rows = self.cursor_to_dict(txn)
@@ -413,12 +413,10 @@ class StreamStore(SQLBaseStore):
             "get_recent_events_for_room", get_recent_events_for_room_txn
         )
 
-    @cached(num_args=0)
+    @defer.inlineCallbacks
     def get_room_events_max_id(self):
-        return self.runInteraction(
-            "get_room_events_max_id",
-            self._get_room_events_max_id_txn
-        )
+        token = yield self._stream_id_gen.get_max_token(self)
+        defer.returnValue("s%d" % (token,))
 
     @defer.inlineCallbacks
     def _get_min_token(self):
@@ -433,27 +431,6 @@ class StreamStore(SQLBaseStore):
 
         defer.returnValue(self.min_token)
 
-    def get_next_stream_id(self):
-        with self._next_stream_id_lock:
-            i = self._next_stream_id
-            self._next_stream_id += 1
-            return i
-
-    def _get_room_events_max_id_txn(self, txn):
-        txn.execute(
-            "SELECT MAX(stream_ordering) as m FROM events"
-        )
-
-        res = self.cursor_to_dict(txn)
-
-        logger.debug("get_room_events_max_id: %s", res)
-
-        if not res or not res[0] or not res[0]["m"]:
-            return "s0"
-
-        key = res[0]["m"]
-        return "s%d" % (key,)
-
     @staticmethod
     def _set_before_and_after(events, rows):
         for event, row in zip(events, rows):
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index b777395e06..89dd7d8947 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import SQLBaseStore, Table, cached
+from ._base import SQLBaseStore, cached
 
 from collections import namedtuple
 
@@ -76,22 +76,18 @@ class TransactionStore(SQLBaseStore):
             response_json (str)
         """
 
-        return self.runInteraction(
-            "set_received_txn_response",
-            self._set_received_txn_response,
-            transaction_id, origin, code, response_dict
+        return self._simple_insert(
+            table=ReceivedTransactionsTable.table_name,
+            values={
+                "transaction_id": transaction_id,
+                "origin": origin,
+                "response_code": code,
+                "response_json": response_dict,
+            },
+            or_ignore=True,
+            desc="set_received_txn_response",
         )
 
-    def _set_received_txn_response(self, txn, transaction_id, origin, code,
-                                   response_json):
-        query = (
-            "UPDATE %s "
-            "SET response_code = ?, response_json = ? "
-            "WHERE transaction_id = ? AND origin = ?"
-        ) % ReceivedTransactionsTable.table_name
-
-        txn.execute(query, (code, response_json, transaction_id, origin))
-
     def prep_send_transaction(self, transaction_id, destination,
                               origin_server_ts):
         """Persists an outgoing transaction and calculates the values for the
@@ -118,41 +114,38 @@ class TransactionStore(SQLBaseStore):
     def _prep_send_transaction(self, txn, transaction_id, destination,
                                origin_server_ts):
 
+        next_id = self._transaction_id_gen.get_next_txn(txn)
+
         # First we find out what the prev_txns should be.
         # Since we know that we are only sending one transaction at a time,
         # we can simply take the last one.
-        query = "%s ORDER BY id DESC LIMIT 1" % (
-                SentTransactions.select_statement("destination = ?"),
-            )
+        query = (
+            "SELECT * FROM sent_transactions"
+            " WHERE destination = ?"
+            " ORDER BY id DESC LIMIT 1"
+        )
 
-        results = txn.execute(query, (destination,))
-        results = SentTransactions.decode_results(results)
+        txn.execute(query, (destination,))
+        results = self.cursor_to_dict(txn)
 
-        prev_txns = [r.transaction_id for r in results]
+        prev_txns = [r["transaction_id"] for r in results]
 
         # Actually add the new transaction to the sent_transactions table.
 
-        query = SentTransactions.insert_statement()
-        txn.execute(query, SentTransactions.EntryType(
-            None,
-            transaction_id=transaction_id,
-            destination=destination,
-            ts=origin_server_ts,
-            response_code=0,
-            response_json=None
-        ))
-
-        # Update the tx id -> pdu id mapping
-
-        # values = [
-        #     (transaction_id, destination, pdu[0], pdu[1])
-        #     for pdu in pdu_list
-        # ]
-        #
-        # logger.debug("Inserting: %s", repr(values))
-        #
-        # query = TransactionsToPduTable.insert_statement()
-        # txn.executemany(query, values)
+        self._simple_insert_txn(
+            txn,
+            table=SentTransactions.table_name,
+            values={
+                "id": next_id,
+                "transaction_id": transaction_id,
+                "destination": destination,
+                "ts": origin_server_ts,
+                "response_code": 0,
+                "response_json": None,
+            }
+        )
+
+        # TODO Update the tx id -> pdu id mapping
 
         return prev_txns
 
@@ -171,15 +164,20 @@ class TransactionStore(SQLBaseStore):
             transaction_id, destination, code, response_dict
         )
 
-    def _delivered_txn(cls, txn, transaction_id, destination,
+    def _delivered_txn(self, txn, transaction_id, destination,
                        code, response_json):
-        query = (
-            "UPDATE %s "
-            "SET response_code = ?, response_json = ? "
-            "WHERE transaction_id = ? AND destination = ?"
-        ) % SentTransactions.table_name
-
-        txn.execute(query, (code, response_json, transaction_id, destination))
+        self._simple_update_one_txn(
+            txn,
+            table=SentTransactions.table_name,
+            keyvalues={
+                "transaction_id": transaction_id,
+                "destination": destination,
+            },
+            updatevalues={
+                "response_code": code,
+                "response_json": None,  # For now, don't persist response_json
+            }
+        )
 
     def get_transactions_after(self, transaction_id, destination):
         """Get all transactions after a given local transaction_id.
@@ -189,25 +187,26 @@ class TransactionStore(SQLBaseStore):
             destination (str)
 
         Returns:
-            list: A list of `ReceivedTransactionsTable.EntryType`
+            list: A list of dicts
         """
         return self.runInteraction(
             "get_transactions_after",
             self._get_transactions_after, transaction_id, destination
         )
 
-    def _get_transactions_after(cls, txn, transaction_id, destination):
-        where = (
-            "destination = ? AND id > (select id FROM %s WHERE "
-            "transaction_id = ? AND destination = ?)"
-        ) % (
-            SentTransactions.table_name
+    def _get_transactions_after(self, txn, transaction_id, destination):
+        query = (
+            "SELECT * FROM sent_transactions"
+            " WHERE destination = ? AND id >"
+            " ("
+            " SELECT id FROM sent_transactions"
+            " WHERE transaction_id = ? AND destination = ?"
+            " )"
         )
-        query = SentTransactions.select_statement(where)
 
         txn.execute(query, (destination, transaction_id, destination))
 
-        return ReceivedTransactionsTable.decode_results(txn.fetchall())
+        return self.cursor_to_dict(txn)
 
     @cached()
     def get_destination_retry_timings(self, destination):
@@ -218,22 +217,27 @@ class TransactionStore(SQLBaseStore):
 
         Returns:
             None if not retrying
-            Otherwise a DestinationsTable.EntryType for the retry scheme
+            Otherwise a dict for the retry scheme
         """
         return self.runInteraction(
             "get_destination_retry_timings",
             self._get_destination_retry_timings, destination)
 
-    def _get_destination_retry_timings(cls, txn, destination):
-        query = DestinationsTable.select_statement("destination = ?")
-        txn.execute(query, (destination,))
-        result = txn.fetchall()
-        if result:
-            result = DestinationsTable.decode_single_result(result)
-            if result.retry_last_ts > 0:
-                return result
-            else:
-                return None
+    def _get_destination_retry_timings(self, txn, destination):
+        result = self._simple_select_one_txn(
+            txn,
+            table=DestinationsTable.table_name,
+            keyvalues={
+                "destination": destination,
+            },
+            retcols=DestinationsTable.fields,
+            allow_none=True,
+        )
+
+        if result and result["retry_last_ts"] > 0:
+            return result
+        else:
+            return None
 
     def set_destination_retry_timings(self, destination,
                                       retry_last_ts, retry_interval):
@@ -249,11 +253,11 @@ class TransactionStore(SQLBaseStore):
         # As this is the new value, we might as well prefill the cache
         self.get_destination_retry_timings.prefill(
             destination,
-            DestinationsTable.EntryType(
-                destination,
-                retry_last_ts,
-                retry_interval
-            )
+            {
+                "destination": destination,
+                "retry_last_ts": retry_last_ts,
+                "retry_interval": retry_interval
+            },
         )
 
         # XXX: we could chose to not bother persisting this if our cache thinks
@@ -266,22 +270,38 @@ class TransactionStore(SQLBaseStore):
             retry_interval,
         )
 
-    def _set_destination_retry_timings(cls, txn, destination,
+    def _set_destination_retry_timings(self, txn, destination,
                                        retry_last_ts, retry_interval):
-
         query = (
-            "INSERT OR REPLACE INTO %s "
-            "(destination, retry_last_ts, retry_interval) "
-            "VALUES (?, ?, ?) "
-        ) % DestinationsTable.table_name
+            "UPDATE destinations"
+            " SET retry_last_ts = ?, retry_interval = ?"
+            " WHERE destination = ?"
+        )
+
+        txn.execute(
+            query,
+            (
+                retry_last_ts, retry_interval, destination,
+            )
+        )
 
-        txn.execute(query, (destination, retry_last_ts, retry_interval))
+        if txn.rowcount == 0:
+            # destination wasn't already in table. Insert it.
+            self._simple_insert_txn(
+                txn,
+                table="destinations",
+                values={
+                    "destination": destination,
+                    "retry_last_ts": retry_last_ts,
+                    "retry_interval": retry_interval,
+                }
+            )
 
     def get_destinations_needing_retry(self):
         """Get all destinations which are due a retry for sending a transaction.
 
         Returns:
-            list: A list of `DestinationsTable.EntryType`
+            list: A list of dicts
         """
 
         return self.runInteraction(
@@ -289,14 +309,17 @@ class TransactionStore(SQLBaseStore):
             self._get_destinations_needing_retry
         )
 
-    def _get_destinations_needing_retry(cls, txn):
-        where = "retry_last_ts > 0 and retry_next_ts < now()"
-        query = DestinationsTable.select_statement(where)
-        txn.execute(query)
-        return DestinationsTable.decode_results(txn.fetchall())
+    def _get_destinations_needing_retry(self, txn):
+        query = (
+            "SELECT * FROM destinations"
+            " WHERE retry_last_ts > 0 and retry_next_ts < ?"
+        )
+
+        txn.execute(query, (self._clock.time_msec(),))
+        return self.cursor_to_dict(txn)
 
 
-class ReceivedTransactionsTable(Table):
+class ReceivedTransactionsTable(object):
     table_name = "received_transactions"
 
     fields = [
@@ -308,10 +331,8 @@ class ReceivedTransactionsTable(Table):
         "has_been_referenced",
     ]
 
-    EntryType = namedtuple("ReceivedTransactionsEntry", fields)
 
-
-class SentTransactions(Table):
+class SentTransactions(object):
     table_name = "sent_transactions"
 
     fields = [
@@ -326,7 +347,7 @@ class SentTransactions(Table):
     EntryType = namedtuple("SentTransactionsEntry", fields)
 
 
-class TransactionsToPduTable(Table):
+class TransactionsToPduTable(object):
     table_name = "transaction_id_to_pdu"
 
     fields = [
@@ -336,10 +357,8 @@ class TransactionsToPduTable(Table):
         "pdu_origin",
     ]
 
-    EntryType = namedtuple("TransactionsToPduEntry", fields)
-
 
-class DestinationsTable(Table):
+class DestinationsTable(object):
     table_name = "destinations"
 
     fields = [
@@ -347,5 +366,3 @@ class DestinationsTable(Table):
         "retry_last_ts",
         "retry_interval",
     ]
-
-    EntryType = namedtuple("DestinationsEntry", fields)
diff --git a/synapse/storage/util/__init__.py b/synapse/storage/util/__init__.py
new file mode 100644
index 0000000000..c488b10d3c
--- /dev/null
+++ b/synapse/storage/util/__init__.py
@@ -0,0 +1,14 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014, 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
new file mode 100644
index 0000000000..9d461d5e96
--- /dev/null
+++ b/synapse/storage/util/id_generators.py
@@ -0,0 +1,131 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014, 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from collections import deque
+import contextlib
+import threading
+
+
+class IdGenerator(object):
+    def __init__(self, table, column, store):
+        self.table = table
+        self.column = column
+        self.store = store
+        self._lock = threading.Lock()
+        self._next_id = None
+
+    @defer.inlineCallbacks
+    def get_next(self):
+        with self._lock:
+            if not self._next_id:
+                res = yield self.store._execute_and_decode(
+                    "IdGenerator_%s" % (self.table,),
+                    "SELECT MAX(%s) as mx FROM %s" % (self.column, self.table,)
+                )
+
+                self._next_id = (res and res[0] and res[0]["mx"]) or 1
+
+            i = self._next_id
+            self._next_id += 1
+            defer.returnValue(i)
+
+    def get_next_txn(self, txn):
+        with self._lock:
+            if self._next_id:
+                i = self._next_id
+                self._next_id += 1
+                return i
+            else:
+                txn.execute(
+                    "SELECT MAX(%s) FROM %s" % (self.column, self.table,)
+                )
+
+                val, = txn.fetchone()
+                cur = val or 0
+                cur += 1
+                self._next_id = cur + 1
+
+                return cur
+
+
+class StreamIdGenerator(object):
+    """Used to generate new stream ids when persisting events while keeping
+    track of which transactions have been completed.
+
+    This allows us to get the "current" stream id, i.e. the stream id such that
+    all ids less than or equal to it have completed. This handles the fact that
+    persistence of events can complete out of order.
+
+    Usage:
+        with stream_id_gen.get_next_txn(txn) as stream_id:
+            # ... persist event ...
+    """
+    def __init__(self):
+        self._lock = threading.Lock()
+
+        self._current_max = None
+        self._unfinished_ids = deque()
+
+    def get_next_txn(self, txn):
+        """
+        Usage:
+            with stream_id_gen.get_next_txn(txn) as stream_id:
+                # ... persist event ...
+        """
+        with self._lock:
+            if not self._current_max:
+                self._compute_current_max(txn)
+
+            self._current_max += 1
+            next_id = self._current_max
+
+            self._unfinished_ids.append(next_id)
+
+        @contextlib.contextmanager
+        def manager():
+            try:
+                yield next_id
+            finally:
+                with self._lock:
+                    self._unfinished_ids.remove(next_id)
+
+        return manager()
+
+    @defer.inlineCallbacks
+    def get_max_token(self, store):
+        """Returns the maximum stream id such that all stream ids less than or
+        equal to it have been successfully persisted.
+        """
+        with self._lock:
+            if self._unfinished_ids:
+                defer.returnValue(self._unfinished_ids[0] - 1)
+
+            if not self._current_max:
+                yield store.runInteraction(
+                    "_compute_current_max",
+                    self._compute_current_max,
+                )
+
+            defer.returnValue(self._current_max)
+
+    def _compute_current_max(self, txn):
+        txn.execute("SELECT MAX(stream_ordering) FROM events")
+        val, = txn.fetchone()
+
+        self._current_max = int(val) if val else 1
+
+        return self._current_max