summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py137
-rw-r--r--synapse/storage/_base.py125
-rw-r--r--synapse/storage/appservice.py6
-rw-r--r--synapse/storage/directory.py8
-rw-r--r--synapse/storage/engines/__init__.py37
-rw-r--r--synapse/storage/engines/maria.py47
-rw-r--r--synapse/storage/engines/sqlite3.py37
-rw-r--r--synapse/storage/event_federation.py25
-rw-r--r--synapse/storage/events.py90
-rw-r--r--synapse/storage/keys.py16
-rw-r--r--synapse/storage/presence.py1
-rw-r--r--synapse/storage/profile.py12
-rw-r--r--synapse/storage/push_rule.py4
-rw-r--r--synapse/storage/registration.py50
-rw-r--r--synapse/storage/room.py11
-rw-r--r--synapse/storage/roommember.py148
-rw-r--r--synapse/storage/schema/delta/12/v12.sql58
-rw-r--r--synapse/storage/schema/delta/13/v13.sql25
-rw-r--r--synapse/storage/schema/delta/14/v14.sql8
-rw-r--r--synapse/storage/schema/delta/15/appservice_txns.sql17
-rw-r--r--synapse/storage/schema/delta/15/presence_indices.sql2
-rw-r--r--synapse/storage/schema/full_schemas/11/event_edges.sql68
-rw-r--r--synapse/storage/schema/full_schemas/11/event_signatures.sql54
-rw-r--r--synapse/storage/schema/full_schemas/11/im.sql121
-rw-r--r--synapse/storage/schema/full_schemas/11/keys.sql24
-rw-r--r--synapse/storage/schema/full_schemas/11/media_repository.sql60
-rw-r--r--synapse/storage/schema/full_schemas/11/presence.sql30
-rw-r--r--synapse/storage/schema/full_schemas/11/profiles.sql10
-rw-r--r--synapse/storage/schema/full_schemas/11/redactions.sql8
-rw-r--r--synapse/storage/schema/full_schemas/11/room_aliases.sql16
-rw-r--r--synapse/storage/schema/full_schemas/11/state.sql41
-rw-r--r--synapse/storage/schema/full_schemas/11/transactions.sql45
-rw-r--r--synapse/storage/schema/full_schemas/11/users.sql43
-rw-r--r--synapse/storage/schema/schema_version.sql15
-rw-r--r--synapse/storage/signatures.py6
-rw-r--r--synapse/storage/state.py13
-rw-r--r--synapse/storage/stream.py33
-rw-r--r--synapse/storage/transactions.py208
-rw-r--r--synapse/storage/util/__init__.py14
-rw-r--r--synapse/storage/util/id_generators.py126
40 files changed, 1069 insertions, 730 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index f4dec70393..87db382fbb 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -104,14 +104,16 @@ class DataStore(RoomMemberStore, RoomStore,
 
         self.client_ip_last_seen.prefill(*key + (now,))
 
-        yield self._simple_insert(
+        yield self._simple_upsert(
             "user_ips",
-            {
+            keyvalues={
                 "user": user.to_string(),
                 "access_token": access_token,
-                "device_id": device_id,
                 "ip": ip,
                 "user_agent": user_agent,
+            },
+            values={
+                "device_id": device_id,
                 "last_seen": now,
             },
             desc="insert_client_ip",
@@ -148,21 +150,23 @@ class UpgradeDatabaseException(PrepareDatabaseException):
     pass
 
 
-def prepare_database(db_conn):
+def prepare_database(db_conn, database_engine):
     """Prepares a database for usage. Will either create all necessary tables
     or upgrade from an older schema version.
     """
     try:
         cur = db_conn.cursor()
-        version_info = _get_or_create_schema_state(cur)
+        version_info = _get_or_create_schema_state(cur, database_engine)
 
         if version_info:
             user_version, delta_files, upgraded = version_info
-            _upgrade_existing_database(cur, user_version, delta_files, upgraded)
+            _upgrade_existing_database(
+                cur, user_version, delta_files, upgraded, database_engine
+            )
         else:
-            _setup_new_database(cur)
+            _setup_new_database(cur, database_engine)
 
-        cur.execute("PRAGMA user_version = %d" % (SCHEMA_VERSION,))
+        # cur.execute("PRAGMA user_version = %d" % (SCHEMA_VERSION,))
 
         cur.close()
         db_conn.commit()
@@ -171,7 +175,7 @@ def prepare_database(db_conn):
         raise
 
 
-def _setup_new_database(cur):
+def _setup_new_database(cur, database_engine):
     """Sets up the database by finding a base set of "full schemas" and then
     applying any necessary deltas.
 
@@ -225,31 +229,30 @@ def _setup_new_database(cur):
 
     directory_entries = os.listdir(sql_dir)
 
-    sql_script = "BEGIN TRANSACTION;\n"
     for filename in fnmatch.filter(directory_entries, "*.sql"):
         sql_loc = os.path.join(sql_dir, filename)
         logger.debug("Applying schema %s", sql_loc)
-        sql_script += read_schema(sql_loc)
-        sql_script += "\n"
-    sql_script += "COMMIT TRANSACTION;"
-    cur.executescript(sql_script)
+        executescript(cur, sql_loc)
 
     cur.execute(
-        "INSERT OR REPLACE INTO schema_version (version, upgraded)"
-        " VALUES (?,?)",
-        (max_current_ver, False)
+        database_engine.convert_param_style(
+            "REPLACE INTO schema_version (version, upgraded)"
+            " VALUES (?,?)"
+        ),
+        (max_current_ver, False,)
     )
 
     _upgrade_existing_database(
         cur,
         current_version=max_current_ver,
         applied_delta_files=[],
-        upgraded=False
+        upgraded=False,
+        database_engine=database_engine,
     )
 
 
 def _upgrade_existing_database(cur, current_version, applied_delta_files,
-                               upgraded):
+                               upgraded, database_engine):
     """Upgrades an existing database.
 
     Delta files can either be SQL stored in *.sql files, or python modules
@@ -305,6 +308,8 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files,
     if not upgraded:
         start_ver += 1
 
+    logger.debug("applied_delta_files: %s", applied_delta_files)
+
     for v in range(start_ver, SCHEMA_VERSION + 1):
         logger.debug("Upgrading schema to v%d", v)
 
@@ -321,6 +326,7 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files,
         directory_entries.sort()
         for file_name in directory_entries:
             relative_path = os.path.join(str(v), file_name)
+            logger.debug("Found file: %s", relative_path)
             if relative_path in applied_delta_files:
                 continue
 
@@ -342,9 +348,8 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files,
                 module.run_upgrade(cur)
             elif ext == ".sql":
                 # A plain old .sql file, just read and execute it
-                delta_schema = read_schema(absolute_path)
                 logger.debug("Applying schema %s", relative_path)
-                cur.executescript(delta_schema)
+                executescript(cur, absolute_path)
             else:
                 # Not a valid delta file.
                 logger.warn(
@@ -356,24 +361,85 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files,
 
             # Mark as done.
             cur.execute(
-                "INSERT INTO applied_schema_deltas (version, file)"
-                " VALUES (?,?)",
+                database_engine.convert_param_style(
+                    "INSERT INTO applied_schema_deltas (version, file)"
+                    " VALUES (?,?)",
+                ),
                 (v, relative_path)
             )
 
             cur.execute(
-                "INSERT OR REPLACE INTO schema_version (version, upgraded)"
-                " VALUES (?,?)",
+                database_engine.convert_param_style(
+                    "REPLACE INTO schema_version (version, upgraded)"
+                    " VALUES (?,?)",
+                ),
                 (v, True)
             )
 
 
-def _get_or_create_schema_state(txn):
-    schema_path = os.path.join(
-        dir_path, "schema", "schema_version.sql",
-    )
-    create_schema = read_schema(schema_path)
-    txn.executescript(create_schema)
+def get_statements(f):
+    statement_buffer = ""
+    in_comment = False  # If we're in a /* ... */ style comment
+
+    for line in f:
+        line = line.strip()
+
+        if in_comment:
+            # Check if this line contains an end to the comment
+            comments = line.split("*/", 1)
+            if len(comments) == 1:
+                continue
+            line = comments[1]
+            in_comment = False
+
+        # Remove inline block comments
+        line = re.sub(r"/\*.*\*/", " ", line)
+
+        # Does this line start a comment?
+        comments = line.split("/*", 1)
+        if len(comments) > 1:
+            line = comments[0]
+            in_comment = True
+
+        # Deal with line comments
+        line = line.split("--", 1)[0]
+        line = line.split("//", 1)[0]
+
+        # Find *all* semicolons. We need to treat first and last entry
+        # specially.
+        statements = line.split(";")
+
+        # We must prepend statement_buffer to the first statement
+        first_statement = "%s %s" % (
+            statement_buffer.strip(),
+            statements[0].strip()
+        )
+        statements[0] = first_statement
+
+        # Every entry, except the last, is a full statement
+        for statement in statements[:-1]:
+            yield statement.strip()
+
+        # The last entry did *not* end in a semicolon, so we store it for the
+        # next semicolon we find
+        statement_buffer = statements[-1].strip()
+
+
+def executescript(txn, schema_path):
+    with open(schema_path, 'r') as f:
+        for statement in get_statements(f):
+            txn.execute(statement)
+
+
+def _get_or_create_schema_state(txn, database_engine):
+    try:
+        # Bluntly try creating the schema_version tables.
+        schema_path = os.path.join(
+            dir_path, "schema", "schema_version.sql",
+        )
+        executescript(txn, schema_path)
+    except:
+        pass
 
     txn.execute("SELECT version, upgraded FROM schema_version")
     row = txn.fetchone()
@@ -382,10 +448,13 @@ def _get_or_create_schema_state(txn):
 
     if current_version:
         txn.execute(
-            "SELECT file FROM applied_schema_deltas WHERE version >= ?",
+            database_engine.convert_param_style(
+                "SELECT file FROM applied_schema_deltas WHERE version >= ?"
+            ),
             (current_version,)
         )
-        return current_version, txn.fetchall(), upgraded
+        applied_deltas = [d for d, in txn.fetchall()]
+        return current_version, applied_deltas, upgraded
 
     return None
 
@@ -417,7 +486,7 @@ def prepare_sqlite3_database(db_conn):
 
             if row and row[0]:
                 db_conn.execute(
-                    "INSERT OR REPLACE INTO schema_version (version, upgraded)"
+                    "REPLACE INTO schema_version (version, upgraded)"
                     " VALUES (?,?)",
                     (row[0], False)
                 )
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index e3e67d8e0d..c1bf98cdcb 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -22,6 +22,8 @@ from synapse.util.logcontext import PreserveLoggingContext, LoggingContext
 from synapse.util.lrucache import LruCache
 import synapse.metrics
 
+from util.id_generators import IdGenerator, StreamIdGenerator
+
 from twisted.internet import defer
 
 from collections import namedtuple, OrderedDict
@@ -145,11 +147,12 @@ class LoggingTransaction(object):
     """An object that almost-transparently proxies for the 'txn' object
     passed to the constructor. Adds logging and metrics to the .execute()
     method."""
-    __slots__ = ["txn", "name"]
+    __slots__ = ["txn", "name", "database_engine"]
 
-    def __init__(self, txn, name):
+    def __init__(self, txn, name, database_engine):
         object.__setattr__(self, "txn", txn)
         object.__setattr__(self, "name", name)
+        object.__setattr__(self, "database_engine", database_engine)
 
     def __getattr__(self, name):
         return getattr(self.txn, name)
@@ -161,26 +164,32 @@ class LoggingTransaction(object):
         # TODO(paul): Maybe use 'info' and 'debug' for values?
         sql_logger.debug("[SQL] {%s} %s", self.name, sql)
 
-        try:
-            if args and args[0]:
-                values = args[0]
+        sql = self.database_engine.convert_param_style(sql)
+
+        if args and args[0]:
+            args = list(args)
+            args[0] = [
+                self.database_engine.encode_parameter(a) for a in args[0]
+            ]
+            try:
                 sql_logger.debug(
-                    "[SQL values] {%s} " + ", ".join(("<%r>",) * len(values)),
+                    "[SQL values] {%s} " + ", ".join(("<%r>",) * len(args[0])),
                     self.name,
-                    *values
+                    *args[0]
                 )
-        except:
-            # Don't let logging failures stop SQL from working
-            pass
+            except:
+                # Don't let logging failures stop SQL from working
+                pass
 
         start = time.time() * 1000
+
         try:
             return self.txn.execute(
                 sql, *args, **kwargs
             )
-        except:
-                logger.exception("[SQL FAIL] {%s}", self.name)
-                raise
+        except Exception as e:
+            logger.debug("[SQL FAIL] {%s} %s", self.name, e)
+            raise
         finally:
             msecs = (time.time() * 1000) - start
             sql_logger.debug("[SQL time] {%s} %f", self.name, msecs)
@@ -245,6 +254,13 @@ class SQLBaseStore(object):
         self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True,
                                       max_entries=hs.config.event_cache_size)
 
+        self.database_engine = hs.database_engine
+
+        self._stream_id_gen = StreamIdGenerator()
+        self._transaction_id_gen = IdGenerator("sent_transactions", "id", self)
+        self._state_groups_id_gen = IdGenerator("state_groups", "id", self)
+        self._access_tokens_id_gen = IdGenerator("access_tokens", "id", self)
+
     def start_profiling(self):
         self._previous_loop_ts = self._clock.time_msec()
 
@@ -281,7 +297,7 @@ class SQLBaseStore(object):
 
         start_time = time.time() * 1000
 
-        def inner_func(txn, *args, **kwargs):
+        def inner_func(conn, *args, **kwargs):
             with LoggingContext("runInteraction") as context:
                 current_context.copy_to(context)
                 start = time.time() * 1000
@@ -296,9 +312,25 @@ class SQLBaseStore(object):
                 sql_scheduling_timer.inc_by(time.time() * 1000 - start_time)
                 transaction_logger.debug("[TXN START] {%s}", name)
                 try:
-                    return func(LoggingTransaction(txn, name), *args, **kwargs)
-                except:
-                    logger.exception("[TXN FAIL] {%s}", name)
+                    i = 0
+                    N = 5
+                    while True:
+                        try:
+                            txn = conn.cursor()
+                            return func(
+                                LoggingTransaction(txn, name, self.database_engine),
+                                *args, **kwargs
+                            )
+                        except self.database_engine.module.DatabaseError as e:
+                            if self.database_engine.is_deadlock(e):
+                                logger.warn("[TXN DEADLOCK] {%s} %d/%d", name, i, N)
+                                if i < N:
+                                    i += 1
+                                    conn.rollback()
+                                    continue
+                            raise
+                except Exception as e:
+                    logger.debug("[TXN FAIL] {%s}", name, e)
                     raise
                 finally:
                     end = time.time() * 1000
@@ -311,7 +343,7 @@ class SQLBaseStore(object):
                     sql_txn_timer.inc_by(duration, desc)
 
         with PreserveLoggingContext():
-            result = yield self._db_pool.runInteraction(
+            result = yield self._db_pool.runWithConnection(
                 inner_func, *args, **kwargs
             )
         defer.returnValue(result)
@@ -342,11 +374,11 @@ class SQLBaseStore(object):
             The result of decoder(results)
         """
         def interaction(txn):
-            cursor = txn.execute(query, args)
+            txn.execute(query, args)
             if decoder:
-                return decoder(cursor)
+                return decoder(txn)
             else:
-                return cursor.fetchall()
+                return txn.fetchall()
 
         return self.runInteraction(desc, interaction)
 
@@ -356,27 +388,23 @@ class SQLBaseStore(object):
     # "Simple" SQL API methods that operate on a single table with no JOINs,
     # no complex WHERE clauses, just a dict of values for columns.
 
-    def _simple_insert(self, table, values, or_replace=False, or_ignore=False,
+    def _simple_insert(self, table, values, or_ignore=False,
                        desc="_simple_insert"):
         """Executes an INSERT query on the named table.
 
         Args:
             table : string giving the table name
             values : dict of new column names and values for them
-            or_replace : bool; if True performs an INSERT OR REPLACE
         """
         return self.runInteraction(
             desc,
-            self._simple_insert_txn, table, values, or_replace=or_replace,
-            or_ignore=or_ignore,
+            self._simple_insert_txn, table, values,
+            or_ignore=or_ignore
         )
 
     @log_function
-    def _simple_insert_txn(self, txn, table, values, or_replace=False,
-                           or_ignore=False):
-        sql = "%s INTO %s (%s) VALUES(%s)" % (
-            ("INSERT OR REPLACE" if or_replace else
-             "INSERT OR IGNORE" if or_ignore else "INSERT"),
+    def _simple_insert_txn(self, txn, table, values, or_ignore=False):
+        sql = "INSERT INTO %s (%s) VALUES(%s)" % (
             table,
             ", ".join(k for k in values),
             ", ".join("?" for k in values)
@@ -387,8 +415,11 @@ class SQLBaseStore(object):
             sql, values.values(),
         )
 
-        txn.execute(sql, values.values())
-        return txn.lastrowid
+        try:
+            txn.execute(sql, values.values())
+        except self.database_engine.module.IntegrityError:
+            if not or_ignore:
+                raise
 
     def _simple_upsert(self, table, keyvalues, values, desc="_simple_upsert"):
         """
@@ -489,8 +520,7 @@ class SQLBaseStore(object):
 
     def _simple_select_onecol_txn(self, txn, table, keyvalues, retcol):
         sql = (
-            "SELECT %(retcol)s FROM %(table)s WHERE %(where)s "
-            "ORDER BY rowid asc"
+            "SELECT %(retcol)s FROM %(table)s WHERE %(where)s"
         ) % {
             "retcol": retcol,
             "table": table,
@@ -548,14 +578,14 @@ class SQLBaseStore(object):
             retcols : list of strings giving the names of the columns to return
         """
         if keyvalues:
-            sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % (
+            sql = "SELECT %s FROM %s WHERE %s" % (
                 ", ".join(retcols),
                 table,
                 " AND ".join("%s = ?" % (k, ) for k in keyvalues)
             )
             txn.execute(sql, keyvalues.values())
         else:
-            sql = "SELECT %s FROM %s ORDER BY rowid asc" % (
+            sql = "SELECT %s FROM %s" % (
                 ", ".join(retcols),
                 table
             )
@@ -607,10 +637,10 @@ class SQLBaseStore(object):
 
     def _simple_select_one_txn(self, txn, table, keyvalues, retcols,
                                allow_none=False):
-        select_sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % (
+        select_sql = "SELECT %s FROM %s WHERE %s" % (
             ", ".join(retcols),
             table,
-            " AND ".join("%s = ?" % (k) for k in keyvalues)
+            " AND ".join("%s = ?" % (k,) for k in keyvalues)
         )
 
         txn.execute(select_sql, keyvalues.values())
@@ -648,6 +678,11 @@ class SQLBaseStore(object):
                     updatevalues=updatevalues,
                 )
 
+                # if txn.rowcount == 0:
+                #     raise StoreError(404, "No row found")
+                if txn.rowcount > 1:
+                    raise StoreError(500, "More than one row matched")
+
             return ret
         return self.runInteraction(desc, func)
 
@@ -803,10 +838,12 @@ class SQLBaseStore(object):
             sql_getevents_timer.inc_by(curr_time - last_time, desc)
             return curr_time
 
-        d = json.loads(js)
+        logger.debug("Got js: %r", js)
+        d = json.loads(str(js).decode("utf8"))
         start_time = update_counter("decode_json", start_time)
 
-        internal_metadata = json.loads(internal_metadata)
+        logger.debug("Got internal_metadata: %r", internal_metadata)
+        internal_metadata = json.loads(str(internal_metadata).decode("utf8"))
         start_time = update_counter("decode_internal", start_time)
 
         ev = FrozenEvent(
@@ -860,6 +897,12 @@ class SQLBaseStore(object):
         result = txn.fetchone()
         return result[0] if result else None
 
+    def get_next_stream_id(self):
+        with self._next_stream_id_lock:
+            i = self._next_stream_id
+            self._next_stream_id += 1
+            return i
+
 
 class _RollbackButIsFineException(Exception):
     """ This exception is used to rollback a transaction without implying
@@ -883,7 +926,7 @@ class Table(object):
 
     _select_where_clause = "SELECT %s FROM %s WHERE %s"
     _select_clause = "SELECT %s FROM %s"
-    _insert_clause = "INSERT OR REPLACE INTO %s (%s) VALUES (%s)"
+    _insert_clause = "REPLACE INTO %s (%s) VALUES (%s)"
 
     @classmethod
     def select_statement(cls, where_clause=None):
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index f8cbb3f323..40e05b3635 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -366,11 +366,13 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
         new_txn_id = max(highest_txn_id, last_txn_id) + 1
 
         # Insert new txn into txn table
-        event_ids = [e.event_id for e in events]
+        event_ids = buffer(
+            json.dumps([e.event_id for e in events]).encode("utf8")
+        )
         txn.execute(
             "INSERT INTO application_services_txns(as_id, txn_id, event_ids) "
             "VALUES(?,?,?)",
-            (service.id, new_txn_id, json.dumps(event_ids))
+            (service.id, new_txn_id, event_ids)
         )
         return AppServiceTransaction(
             service=service, id=new_txn_id, events=events
diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py
index 0199539fea..2b2bdf8615 100644
--- a/synapse/storage/directory.py
+++ b/synapse/storage/directory.py
@@ -21,8 +21,6 @@ from twisted.internet import defer
 
 from collections import namedtuple
 
-import sqlite3
-
 
 RoomAliasMapping = namedtuple(
     "RoomAliasMapping",
@@ -91,7 +89,7 @@ class DirectoryStore(SQLBaseStore):
                 },
                 desc="create_room_alias_association",
             )
-        except sqlite3.IntegrityError:
+        except self.database_engine.module.IntegrityError:
             raise SynapseError(
                 409, "Room alias %s already exists" % room_alias.to_string()
             )
@@ -120,12 +118,12 @@ class DirectoryStore(SQLBaseStore):
         defer.returnValue(room_id)
 
     def _delete_room_alias_txn(self, txn, room_alias):
-        cursor = txn.execute(
+        txn.execute(
             "SELECT room_id FROM room_aliases WHERE room_alias = ?",
             (room_alias.to_string(),)
         )
 
-        res = cursor.fetchone()
+        res = txn.fetchone()
         if res:
             room_id = res[0]
         else:
diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py
new file mode 100644
index 0000000000..29702be923
--- /dev/null
+++ b/synapse/storage/engines/__init__.py
@@ -0,0 +1,37 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from .maria import MariaEngine
+from .sqlite3 import Sqlite3Engine
+
+import importlib
+
+
+SUPPORTED_MODULE = {
+    "sqlite3": Sqlite3Engine,
+    "mysql.connector": MariaEngine,
+}
+
+
+def create_engine(name):
+    engine_class = SUPPORTED_MODULE.get(name, None)
+
+    if engine_class:
+        module = importlib.import_module(name)
+        return engine_class(module)
+
+    raise RuntimeError(
+        "Unsupported database engine '%s'" % (name,)
+    )
diff --git a/synapse/storage/engines/maria.py b/synapse/storage/engines/maria.py
new file mode 100644
index 0000000000..230b32858c
--- /dev/null
+++ b/synapse/storage/engines/maria.py
@@ -0,0 +1,47 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.storage import prepare_database
+
+import types
+
+
+class MariaEngine(object):
+    def __init__(self, database_module):
+        self.module = database_module
+
+    def convert_param_style(self, sql):
+        return sql.replace("?", "%s")
+
+    def encode_parameter(self, param):
+        if isinstance(param, types.BufferType):
+            return str(param)
+        return param
+
+    def on_new_connection(self, db_conn):
+        pass
+
+    def prepare_database(self, db_conn):
+        cur = db_conn.cursor()
+        cur.execute(
+            "ALTER DATABASE CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci"
+        )
+        db_conn.commit()
+        prepare_database(db_conn, self)
+
+    def is_deadlock(self, error):
+        if isinstance(error, self.module.DatabaseError):
+            return error.sqlstate == "40001" and error.errno == 1213
+        return False
diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite3.py
new file mode 100644
index 0000000000..72c11df461
--- /dev/null
+++ b/synapse/storage/engines/sqlite3.py
@@ -0,0 +1,37 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.storage import prepare_database, prepare_sqlite3_database
+
+
+class Sqlite3Engine(object):
+    def __init__(self, database_module):
+        self.module = database_module
+
+    def convert_param_style(self, sql):
+        return sql
+
+    def encode_parameter(self, param):
+        return param
+
+    def on_new_connection(self, db_conn):
+        self.prepare_database(db_conn)
+
+    def prepare_database(self, db_conn):
+        prepare_sqlite3_database(db_conn)
+        prepare_database(db_conn, self)
+
+    def is_deadlock(self, error):
+        return False
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 032334bfd6..79ad5ddc9c 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -242,7 +242,6 @@ class EventFederationStore(SQLBaseStore):
                     "room_id": room_id,
                     "min_depth": depth,
                 },
-                or_replace=True,
             )
 
     def _handle_prev_events(self, txn, outlier, event_id, prev_events,
@@ -262,7 +261,6 @@ class EventFederationStore(SQLBaseStore):
                     "room_id": room_id,
                     "is_state": 0,
                 },
-                or_ignore=True,
             )
 
         # Update the extremities table if this is not an outlier.
@@ -281,19 +279,19 @@ class EventFederationStore(SQLBaseStore):
             # We only insert as a forward extremity the new event if there are
             # no other events that reference it as a prev event
             query = (
-                "INSERT OR IGNORE INTO %(table)s (event_id, room_id) "
-                "SELECT ?, ? WHERE NOT EXISTS ("
-                "SELECT 1 FROM %(event_edges)s WHERE "
-                "prev_event_id = ? "
-                ")"
-            ) % {
-                "table": "event_forward_extremities",
-                "event_edges": "event_edges",
-            }
+                "SELECT 1 FROM event_edges WHERE prev_event_id = ?"
+            )
 
-            logger.debug("query: %s", query)
+            txn.execute(query, (event_id,))
+
+            if not txn.fetchone():
+                query = (
+                    "INSERT INTO event_forward_extremities"
+                    " (event_id, room_id)"
+                    " VALUES (?, ?)"
+                )
 
-            txn.execute(query, (event_id, room_id, event_id))
+                txn.execute(query, (event_id, room_id))
 
             # Insert all the prev_events as a backwards thing, they'll get
             # deleted in a second if they're incorrect anyway.
@@ -306,7 +304,6 @@ class EventFederationStore(SQLBaseStore):
                         "event_id": e_id,
                         "room_id": room_id,
                     },
-                    or_ignore=True,
                 )
 
             # Also delete from the backwards extremities table all ones that
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 2425f57f5f..a2e87c27ce 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -52,7 +52,6 @@ class EventsStore(SQLBaseStore):
                 is_new_state=is_new_state,
                 current_state=current_state,
             )
-            self.get_room_events_max_id.invalidate()
         except _RollbackButIsFineException:
             pass
 
@@ -96,12 +95,22 @@ class EventsStore(SQLBaseStore):
         # Remove the any existing cache entries for the event_id
         self._invalidate_get_event_cache(event.event_id)
 
+        if stream_ordering is None:
+            with self._stream_id_gen.get_next_txn(txn) as stream_ordering:
+                return self._persist_event_txn(
+                    txn, event, context, backfilled,
+                    stream_ordering=stream_ordering,
+                    is_new_state=is_new_state,
+                    current_state=current_state,
+                )
+
         # We purposefully do this first since if we include a `current_state`
         # key, we *want* to update the `current_state_events` table
         if current_state:
-            txn.execute(
-                "DELETE FROM current_state_events WHERE room_id = ?",
-                (event.room_id,)
+            self._simple_delete_txn(
+                txn,
+                table="current_state_events",
+                keyvalues={"room_id": event.room_id},
             )
 
             for s in current_state:
@@ -114,7 +123,6 @@ class EventsStore(SQLBaseStore):
                         "type": s.type,
                         "state_key": s.state_key,
                     },
-                    or_replace=True,
                 )
 
         if event.is_state() and is_new_state:
@@ -128,7 +136,6 @@ class EventsStore(SQLBaseStore):
                         "type": event.type,
                         "state_key": event.state_key,
                     },
-                    or_replace=True,
                 )
 
                 for prev_state_id, _ in event.prev_state:
@@ -151,14 +158,6 @@ class EventsStore(SQLBaseStore):
                 event.depth
             )
 
-        self._handle_prev_events(
-            txn,
-            outlier=outlier,
-            event_id=event.event_id,
-            prev_events=event.prev_events,
-            room_id=event.room_id,
-        )
-
         have_persisted = self._simple_select_one_onecol_txn(
             txn,
             table="event_json",
@@ -185,7 +184,7 @@ class EventsStore(SQLBaseStore):
                 )
                 txn.execute(
                     sql,
-                    (metadata_json.decode("UTF-8"), event.event_id,)
+                    (buffer(metadata_json), event.event_id,)
                 )
 
                 sql = (
@@ -198,10 +197,16 @@ class EventsStore(SQLBaseStore):
                 )
             return
 
+        self._handle_prev_events(
+            txn,
+            outlier=outlier,
+            event_id=event.event_id,
+            prev_events=event.prev_events,
+            room_id=event.room_id,
+        )
+
         if event.type == EventTypes.Member:
             self._store_room_member_txn(txn, event)
-        elif event.type == EventTypes.Feedback:
-            self._store_feedback_txn(txn, event)
         elif event.type == EventTypes.Name:
             self._store_room_name_txn(txn, event)
         elif event.type == EventTypes.Topic:
@@ -224,15 +229,14 @@ class EventsStore(SQLBaseStore):
             values={
                 "event_id": event.event_id,
                 "room_id": event.room_id,
-                "internal_metadata": metadata_json.decode("UTF-8"),
-                "json": encode_canonical_json(event_dict).decode("UTF-8"),
+                "internal_metadata": buffer(metadata_json),
+                "json": buffer(encode_canonical_json(event_dict)),
             },
-            or_replace=True,
         )
 
-        content = encode_canonical_json(
+        content = buffer(encode_canonical_json(
             event.content
-        ).decode("UTF-8")
+        ))
 
         vals = {
             "topological_ordering": event.depth,
@@ -245,9 +249,6 @@ class EventsStore(SQLBaseStore):
             "depth": event.depth,
         }
 
-        if stream_ordering is not None:
-            vals["stream_ordering"] = stream_ordering
-
         unrec = {
             k: v
             for k, v in event.get_dict().items()
@@ -260,25 +261,22 @@ class EventsStore(SQLBaseStore):
             ]
         }
 
-        vals["unrecognized_keys"] = encode_canonical_json(
+        vals["unrecognized_keys"] = buffer(encode_canonical_json(
             unrec
-        ).decode("UTF-8")
+        ))
 
-        try:
-            self._simple_insert_txn(
-                txn,
-                "events",
-                vals,
-                or_replace=(not outlier),
-                or_ignore=bool(outlier),
-            )
-        except:
-            logger.warn(
-                "Failed to persist, probably duplicate: %s",
-                event.event_id,
-                exc_info=True,
-            )
-            raise _RollbackButIsFineException("_persist_event")
+        sql = (
+            "INSERT INTO events"
+            " (stream_ordering, topological_ordering, event_id, type,"
+            " room_id, content, processed, outlier, depth)"
+            " VALUES (%s,?,?,?,?,?,?,?,?)"
+        ) % (stream_ordering,)
+
+        txn.execute(
+            sql,
+            (event.depth, event.event_id, event.type, event.room_id,
+             content, True, outlier, event.depth)
+        )
 
         if context.rejected:
             self._store_rejections_txn(txn, event.event_id, context.rejected)
@@ -302,15 +300,17 @@ class EventsStore(SQLBaseStore):
             )
 
             if is_new_state and not context.rejected:
-                self._simple_insert_txn(
+                self._simple_upsert_txn(
                     txn,
                     "current_state_events",
-                    {
-                        "event_id": event.event_id,
+                    keyvalues={
                         "room_id": event.room_id,
                         "type": event.type,
                         "state_key": event.state_key,
                     },
+                    values={
+                        "event_id": event.event_id,
+                    }
                 )
 
             for e_id, h in event.prev_state:
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index 09d1e63657..d3b9b38664 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -57,16 +57,18 @@ class KeyStore(SQLBaseStore):
             OpenSSL.crypto.FILETYPE_ASN1, tls_certificate
         )
         fingerprint = hashlib.sha256(tls_certificate_bytes).hexdigest()
-        return self._simple_insert(
+        return self._simple_upsert(
             table="server_tls_certificates",
-            values={
+            keyvalues={
                 "server_name": server_name,
                 "fingerprint": fingerprint,
+            },
+            values={
                 "from_server": from_server,
                 "ts_added_ms": time_now_ms,
                 "tls_certificate": buffer(tls_certificate_bytes),
             },
-            or_ignore=True,
+            desc="store_server_certificate",
         )
 
     @defer.inlineCallbacks
@@ -107,14 +109,16 @@ class KeyStore(SQLBaseStore):
             ts_now_ms (int): The time now in milliseconds
             verification_key (VerifyKey): The NACL verify key.
         """
-        return self._simple_insert(
+        return self._simple_upsert(
             table="server_signature_keys",
-            values={
+            keyvalues={
                 "server_name": server_name,
                 "key_id": "%s:%s" % (verify_key.alg, verify_key.version),
+            },
+            values={
                 "from_server": from_server,
                 "ts_added_ms": time_now_ms,
                 "verify_key": buffer(verify_key.encode()),
             },
-            or_ignore=True,
+            desc="store_server_verify_key",
         )
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index 87fba55439..22ec94bc16 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -57,6 +57,7 @@ class PresenceStore(SQLBaseStore):
             values={"observed_user_id": observed_localpart,
                     "observer_user_id": observer_userid},
             desc="allow_presence_visible",
+            or_ignore=True,
         )
 
     def disallow_presence_visible(self, observed_localpart, observer_userid):
diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py
index a6e52cb248..09778045bf 100644
--- a/synapse/storage/profile.py
+++ b/synapse/storage/profile.py
@@ -13,6 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from twisted.internet import defer
+
 from ._base import SQLBaseStore
 
 
@@ -24,19 +26,25 @@ class ProfileStore(SQLBaseStore):
             desc="create_profile",
         )
 
+    @defer.inlineCallbacks
     def get_profile_displayname(self, user_localpart):
-        return self._simple_select_one_onecol(
+        name = yield self._simple_select_one_onecol(
             table="profiles",
             keyvalues={"user_id": user_localpart},
             retcol="displayname",
             desc="get_profile_displayname",
         )
 
+        if name:
+            name = name.decode("utf8")
+
+        defer.returnValue(name)
+
     def set_profile_displayname(self, user_localpart, new_displayname):
         return self._simple_update_one(
             table="profiles",
             keyvalues={"user_id": user_localpart},
-            updatevalues={"displayname": new_displayname},
+            updatevalues={"displayname": new_displayname.encode("utf8")},
             desc="set_profile_displayname",
         )
 
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index c47bdc2861..ee7718d5ed 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -154,7 +154,7 @@ class PushRuleStore(SQLBaseStore):
             txn.execute(sql, (user_name, priority_class, new_rule_priority))
 
         # now insert the new rule
-        sql = "INSERT OR REPLACE INTO "+PushRuleTable.table_name+" ("
+        sql = "INSERT INTO "+PushRuleTable.table_name+" ("
         sql += ",".join(new_rule.keys())+") VALUES ("
         sql += ", ".join(["?" for _ in new_rule.keys()])+")"
 
@@ -183,7 +183,7 @@ class PushRuleStore(SQLBaseStore):
         new_rule['priority_class'] = priority_class
         new_rule['priority'] = new_prio
 
-        sql = "INSERT OR REPLACE INTO "+PushRuleTable.table_name+" ("
+        sql = "INSERT INTO "+PushRuleTable.table_name+" ("
         sql += ",".join(new_rule.keys())+") VALUES ("
         sql += ", ".join(["?" for _ in new_rule.keys()])+")"
 
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index f24154f146..f7d8291281 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -15,8 +15,6 @@
 
 from twisted.internet import defer
 
-from sqlite3 import IntegrityError
-
 from synapse.api.errors import StoreError, Codes
 
 from ._base import SQLBaseStore, cached
@@ -39,17 +37,13 @@ class RegistrationStore(SQLBaseStore):
         Raises:
             StoreError if there was a problem adding this.
         """
-        row = yield self._simple_select_one(
-            "users", {"name": user_id}, ["id"],
-            desc="add_access_token_to_user",
-        )
-        if not row:
-            raise StoreError(400, "Bad user ID supplied.")
-        row_id = row["id"]
-        yield self._simple_insert(
+        next_id = yield self._access_tokens_id_gen.get_next()
+
+        self._simple_insert(
             "access_tokens",
             {
-                "user_id": row_id,
+                "id": next_id,
+                "user_id": user_id,
                 "token": token
             },
             desc="add_access_token_to_user",
@@ -74,27 +68,41 @@ class RegistrationStore(SQLBaseStore):
     def _register(self, txn, user_id, token, password_hash):
         now = int(self.clock.time())
 
+        next_id = self._access_tokens_id_gen.get_next_txn(txn)
+
         try:
             txn.execute("INSERT INTO users(name, password_hash, creation_ts) "
                         "VALUES (?,?,?)",
                         [user_id, password_hash, now])
-        except IntegrityError:
+        except self.database_engine.module.IntegrityError:
             raise StoreError(
                 400, "User ID already taken.", errcode=Codes.USER_IN_USE
             )
 
         # it's possible for this to get a conflict, but only for a single user
         # since tokens are namespaced based on their user ID
-        txn.execute("INSERT INTO access_tokens(user_id, token) " +
-                    "VALUES (?,?)", [txn.lastrowid, token])
+        txn.execute(
+            "INSERT INTO access_tokens(id, user_id, token)"
+            " VALUES (?,?,?)",
+            (next_id, user_id, token,)
+        )
 
+    @defer.inlineCallbacks
     def get_user_by_id(self, user_id):
-        query = ("SELECT users.name, users.password_hash FROM users"
-                 " WHERE users.name = ?")
-        return self._execute(
-            "get_user_by_id", self.cursor_to_dict, query, user_id
+        user_info = yield self._simple_select_one(
+            table="users",
+            keyvalues={
+                "name": user_id,
+            },
+            retcols=["name", "password_hash"],
+            allow_none=True,
         )
 
+        if user_info:
+            user_info["password_hash"] = user_info["password_hash"].decode("utf8")
+
+        defer.returnValue(user_info)
+
     @cached()
     # TODO(paul): Currently there's no code to invalidate this cache. That
     #   means if/when we ever add internal ways to invalidate access tokens or
@@ -134,12 +142,12 @@ class RegistrationStore(SQLBaseStore):
             "SELECT users.name, users.admin,"
             " access_tokens.device_id, access_tokens.id as token_id"
             " FROM users"
-            " INNER JOIN access_tokens on users.id = access_tokens.user_id"
+            " INNER JOIN access_tokens on users.name = access_tokens.user_id"
             " WHERE token = ?"
         )
 
-        cursor = txn.execute(sql, (token,))
-        rows = self.cursor_to_dict(cursor)
+        txn.execute(sql, (token,))
+        rows = self.cursor_to_dict(txn)
         if rows:
             return rows[0]
 
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index be3e28c2ea..a1a76280fe 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -72,6 +72,7 @@ class RoomStore(SQLBaseStore):
             keyvalues={"room_id": room_id},
             retcols=RoomsTable.fields,
             desc="get_room",
+            allow_none=True,
         )
 
     @defer.inlineCallbacks
@@ -102,10 +103,10 @@ class RoomStore(SQLBaseStore):
                 "ON c.event_id = room_names.event_id "
             )
 
-            # We use non printing ascii character US () as a seperator
+            # We use non printing ascii character US (\x1F) as a separator
             sql = (
                 "SELECT r.room_id, n.name, t.topic, "
-                "group_concat(a.room_alias, '') "
+                "group_concat(a.room_alias, '\x1F') "
                 "FROM rooms AS r "
                 "LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id "
                 "LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id "
@@ -117,9 +118,9 @@ class RoomStore(SQLBaseStore):
                 "name": name_subquery,
             }
 
-            c = txn.execute(sql, (is_public,))
+            txn.execute(sql, (is_public,))
 
-            return c.fetchall()
+            return txn.fetchall()
 
         rows = yield self.runInteraction(
             "get_rooms", f
@@ -130,7 +131,7 @@ class RoomStore(SQLBaseStore):
                 "room_id": r[0],
                 "name": r[1],
                 "topic": r[2],
-                "aliases": r[3].split(""),
+                "aliases": r[3].split("\x1F"),
             }
             for r in rows
         ]
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 52c37c76f5..8ea5756d61 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -40,7 +40,6 @@ class RoomMemberStore(SQLBaseStore):
         """
         try:
             target_user_id = event.state_key
-            domain = UserID.from_string(target_user_id).domain
         except:
             logger.exception(
                 "Failed to parse target_user_id=%s", target_user_id
@@ -65,42 +64,8 @@ class RoomMemberStore(SQLBaseStore):
             }
         )
 
-        # Update room hosts table
-        if event.membership == Membership.JOIN:
-            sql = (
-                "INSERT OR IGNORE INTO room_hosts (room_id, host) "
-                "VALUES (?, ?)"
-            )
-            txn.execute(sql, (event.room_id, domain))
-        elif event.membership != Membership.INVITE:
-            # Check if this was the last person to have left.
-            member_events = self._get_members_query_txn(
-                txn,
-                where_clause=("c.room_id = ? AND m.membership = ?"
-                              " AND m.user_id != ?"),
-                where_values=(event.room_id, Membership.JOIN, target_user_id,)
-            )
-
-            joined_domains = set()
-            for e in member_events:
-                try:
-                    joined_domains.add(
-                        UserID.from_string(e.state_key).domain
-                    )
-                except:
-                    # FIXME: How do we deal with invalid user ids in the db?
-                    logger.exception("Invalid user_id: %s", event.state_key)
-
-            if domain not in joined_domains:
-                sql = (
-                    "DELETE FROM room_hosts WHERE room_id = ? AND host = ?"
-                )
-
-                txn.execute(sql, (event.room_id, domain))
-
         self.get_rooms_for_user.invalidate(target_user_id)
 
-    @defer.inlineCallbacks
     def get_room_member(self, user_id, room_id):
         """Retrieve the current state of a room member.
 
@@ -110,41 +75,27 @@ class RoomMemberStore(SQLBaseStore):
         Returns:
             Deferred: Results in a MembershipEvent or None.
         """
-        rows = yield self._get_members_by_dict({
-            "e.room_id": room_id,
-            "m.user_id": user_id,
-        })
+        def f(txn):
+            events = self._get_members_events_txn(
+                txn,
+                room_id,
+                user_id=user_id,
+            )
 
-        defer.returnValue(rows[0] if rows else None)
+            return events[0] if events else None
 
-    def _get_room_member(self, txn, user_id, room_id):
-        sql = (
-            "SELECT e.* FROM events as e"
-            " INNER JOIN room_memberships as m"
-            " ON e.event_id = m.event_id"
-            " INNER JOIN current_state_events as c"
-            " ON m.event_id = c.event_id"
-            " WHERE m.user_id = ? and e.room_id = ?"
-            " LIMIT 1"
-        )
-        txn.execute(sql, (user_id, room_id))
-        rows = self.cursor_to_dict(txn)
-        if rows:
-            return self._parse_events_txn(txn, rows)[0]
-        else:
-            return None
+        return self.runInteraction("get_room_member", f)
 
     def get_users_in_room(self, room_id):
         def f(txn):
-            sql = (
-                "SELECT m.user_id FROM room_memberships as m"
-                " INNER JOIN current_state_events as c"
-                " ON m.event_id = c.event_id"
-                " WHERE m.membership = ? AND m.room_id = ?"
+
+            rows = self._get_members_rows_txn(
+                txn,
+                room_id=room_id,
+                membership=Membership.JOIN,
             )
 
-            txn.execute(sql, (Membership.JOIN, room_id))
-            return [r[0] for r in txn.fetchall()]
+            return [r["user_id"] for r in rows]
         return self.runInteraction("get_users_in_room", f)
 
     def get_room_members(self, room_id, membership=None):
@@ -159,11 +110,14 @@ class RoomMemberStore(SQLBaseStore):
             list of namedtuples representing the members in this room.
         """
 
-        where = {"m.room_id": room_id}
-        if membership:
-            where["m.membership"] = membership
+        def f(txn):
+            return self._get_members_events_txn(
+                txn,
+                room_id,
+                membership=membership,
+            )
 
-        return self._get_members_by_dict(where)
+        return self.runInteraction("get_room_members", f)
 
     def get_rooms_for_user_where_membership_is(self, user_id, membership_list):
         """ Get all the rooms for this user where the membership for this user
@@ -209,32 +163,55 @@ class RoomMemberStore(SQLBaseStore):
         ]
 
     def get_joined_hosts_for_room(self, room_id):
-        return self._simple_select_onecol(
-            "room_hosts",
-            {"room_id": room_id},
-            "host",
-            desc="get_joined_hosts_for_room",
+        return self.runInteraction(
+            "get_joined_hosts_for_room",
+            self._get_joined_hosts_for_room_txn,
+            room_id,
+        )
+
+    def _get_joined_hosts_for_room_txn(self, txn, room_id):
+        rows = self._get_members_rows_txn(
+            txn,
+            room_id, membership=Membership.JOIN
+        )
+
+        joined_domains = set(
+            UserID.from_string(r["user_id"]).domain
+            for r in rows
         )
 
-    def _get_members_by_dict(self, where_dict):
-        clause = " AND ".join("%s = ?" % k for k in where_dict.keys())
-        vals = where_dict.values()
-        return self._get_members_query(clause, vals)
+        return joined_domains
 
     def _get_members_query(self, where_clause, where_values):
         return self.runInteraction(
-            "get_members_query", self._get_members_query_txn,
+            "get_members_query", self._get_members_events_txn,
             where_clause, where_values
         )
 
-    def _get_members_query_txn(self, txn, where_clause, where_values):
+    def _get_members_events_txn(self, txn, room_id, membership=None, user_id=None):
+        rows = self._get_members_rows_txn(
+            txn,
+            room_id, membership, user_id,
+        )
+        return self._get_events_txn(txn, [r["event_id"] for r in rows])
+
+    def _get_members_rows_txn(self, txn, room_id, membership=None, user_id=None):
+        where_clause = "c.room_id = ?"
+        where_values = [room_id]
+
+        if membership:
+            where_clause += " AND m.membership = ?"
+            where_values.append(membership)
+
+        if user_id:
+            where_clause += " AND m.user_id = ?"
+            where_values.append(user_id)
+
         sql = (
-            "SELECT e.* FROM events as e "
-            "INNER JOIN room_memberships as m "
-            "ON e.event_id = m.event_id "
-            "INNER JOIN current_state_events as c "
-            "ON m.event_id = c.event_id "
-            "WHERE %(where)s "
+            "SELECT m.* FROM room_memberships as m"
+            " INNER JOIN current_state_events as c"
+            " ON m.event_id = c.event_id"
+            " WHERE %(where)s"
         ) % {
             "where": where_clause,
         }
@@ -242,8 +219,7 @@ class RoomMemberStore(SQLBaseStore):
         txn.execute(sql, where_values)
         rows = self.cursor_to_dict(txn)
 
-        results = self._parse_events_txn(txn, rows)
-        return results
+        return rows
 
     @cached()
     def get_rooms_for_user(self, user_id):
diff --git a/synapse/storage/schema/delta/12/v12.sql b/synapse/storage/schema/delta/12/v12.sql
index b87ef1fe79..717d289f78 100644
--- a/synapse/storage/schema/delta/12/v12.sql
+++ b/synapse/storage/schema/delta/12/v12.sql
@@ -14,54 +14,50 @@
  */
 
 CREATE TABLE IF NOT EXISTS rejections(
-    event_id TEXT NOT NULL,
-    reason TEXT NOT NULL,
-    last_check TEXT NOT NULL,
-    CONSTRAINT ev_id UNIQUE (event_id) ON CONFLICT REPLACE
-);
+    event_id VARCHAR(150) NOT NULL,
+    reason VARCHAR(150) NOT NULL,
+    last_check VARCHAR(150) NOT NULL,
+    UNIQUE (event_id)
+) ;
 
 -- Push notification endpoints that users have configured
 CREATE TABLE IF NOT EXISTS pushers (
-  id INTEGER PRIMARY KEY AUTOINCREMENT,
-  user_name TEXT NOT NULL,
-  profile_tag varchar(32) NOT NULL,
-  kind varchar(8) NOT NULL,
-  app_id varchar(64) NOT NULL,
-  app_display_name varchar(64) NOT NULL,
-  device_display_name varchar(128) NOT NULL,
-  pushkey blob NOT NULL,
+  id BIGINT PRIMARY KEY,
+  user_name VARCHAR(150) NOT NULL,
+  profile_tag VARCHAR(32) NOT NULL,
+  kind VARCHAR(8) NOT NULL,
+  app_id VARCHAR(64) NOT NULL,
+  app_display_name VARCHAR(64) NOT NULL,
+  device_display_name VARCHAR(128) NOT NULL,
+  pushkey VARBINARY(512) NOT NULL,
   ts BIGINT NOT NULL,
-  lang varchar(8),
-  data blob,
+  lang VARCHAR(8),
+  data BLOB,
   last_token TEXT,
   last_success BIGINT,
   failing_since BIGINT,
-  FOREIGN KEY(user_name) REFERENCES users(name),
   UNIQUE (app_id, pushkey)
-);
+) ;
 
 CREATE TABLE IF NOT EXISTS push_rules (
-  id INTEGER PRIMARY KEY AUTOINCREMENT,
-  user_name TEXT NOT NULL,
-  rule_id TEXT NOT NULL,
+  id BIGINT PRIMARY KEY,
+  user_name VARCHAR(150) NOT NULL,
+  rule_id VARCHAR(150) NOT NULL,
   priority_class TINYINT NOT NULL,
   priority INTEGER NOT NULL DEFAULT 0,
-  conditions TEXT NOT NULL,
-  actions TEXT NOT NULL,
+  conditions VARCHAR(150) NOT NULL,
+  actions VARCHAR(150) NOT NULL,
   UNIQUE(user_name, rule_id)
-);
+) ;
 
 CREATE INDEX IF NOT EXISTS push_rules_user_name on push_rules (user_name);
 
 CREATE TABLE IF NOT EXISTS user_filters(
-  user_id TEXT,
-  filter_id INTEGER,
-  filter_json TEXT,
-  FOREIGN KEY(user_id) REFERENCES users(id)
-);
+  user_id VARCHAR(150),
+  filter_id BIGINT,
+  filter_json BLOB
+) ;
 
 CREATE INDEX IF NOT EXISTS user_filters_by_user_id_filter_id ON user_filters(
-  user_id, filter_id
+    user_id, filter_id
 );
-
-PRAGMA user_version = 12;
diff --git a/synapse/storage/schema/delta/13/v13.sql b/synapse/storage/schema/delta/13/v13.sql
index e491ad5aec..f5275a59b6 100644
--- a/synapse/storage/schema/delta/13/v13.sql
+++ b/synapse/storage/schema/delta/13/v13.sql
@@ -14,21 +14,18 @@
  */
 
 CREATE TABLE IF NOT EXISTS application_services(
-    id INTEGER PRIMARY KEY AUTOINCREMENT,
-    url TEXT,
-    token TEXT,
-    hs_token TEXT,
-    sender TEXT,
-    UNIQUE(token) ON CONFLICT ROLLBACK
-);
+    id BIGINT PRIMARY KEY,
+    url VARCHAR(150),
+    token VARCHAR(150),
+    hs_token VARCHAR(150),
+    sender VARCHAR(150),
+    UNIQUE(token)
+) ;
 
 CREATE TABLE IF NOT EXISTS application_services_regex(
-    id INTEGER PRIMARY KEY AUTOINCREMENT,
-    as_id INTEGER NOT NULL,
+    id BIGINT PRIMARY KEY,
+    as_id BIGINT NOT NULL,
     namespace INTEGER,  /* enum[room_id|room_alias|user_id] */
-    regex TEXT,
+    regex VARCHAR(150),
     FOREIGN KEY(as_id) REFERENCES application_services(id)
-);
-
-
-
+) ;
diff --git a/synapse/storage/schema/delta/14/v14.sql b/synapse/storage/schema/delta/14/v14.sql
index 0212726448..1d582cc626 100644
--- a/synapse/storage/schema/delta/14/v14.sql
+++ b/synapse/storage/schema/delta/14/v14.sql
@@ -1,9 +1,9 @@
 CREATE TABLE IF NOT EXISTS push_rules_enable (
-  id INTEGER PRIMARY KEY AUTOINCREMENT,
-  user_name TEXT NOT NULL,
-  rule_id TEXT NOT NULL,
+  id BIGINT PRIMARY KEY,
+  user_name VARCHAR(150) NOT NULL,
+  rule_id VARCHAR(150) NOT NULL,
   enabled TINYINT,
   UNIQUE(user_name, rule_id)
-);
+) ;
 
 CREATE INDEX IF NOT EXISTS push_rules_enable_user_name on push_rules_enable (user_name);
diff --git a/synapse/storage/schema/delta/15/appservice_txns.sql b/synapse/storage/schema/delta/15/appservice_txns.sql
index 2b27e2a429..2f4c3eae5f 100644
--- a/synapse/storage/schema/delta/15/appservice_txns.sql
+++ b/synapse/storage/schema/delta/15/appservice_txns.sql
@@ -14,17 +14,18 @@
  */
 
 CREATE TABLE IF NOT EXISTS application_services_state(
-    as_id TEXT PRIMARY KEY,
-    state TEXT,
-    last_txn TEXT
+    as_id VARCHAR(150) PRIMARY KEY,
+    state VARCHAR(5),
+    last_txn INTEGER
 );
 
 CREATE TABLE IF NOT EXISTS application_services_txns(
-    as_id TEXT NOT NULL,
+    as_id VARCHAR(150) NOT NULL,
     txn_id INTEGER NOT NULL,
-    event_ids TEXT NOT NULL,
-    UNIQUE(as_id, txn_id) ON CONFLICT ROLLBACK
+    event_ids BLOB NOT NULL,
+    UNIQUE(as_id, txn_id)
 );
 
-
-
+CREATE INDEX IF NOT EXISTS application_services_txns_id ON application_services_txns (
+    as_id
+);
diff --git a/synapse/storage/schema/delta/15/presence_indices.sql b/synapse/storage/schema/delta/15/presence_indices.sql
new file mode 100644
index 0000000000..6b8d0f1ca7
--- /dev/null
+++ b/synapse/storage/schema/delta/15/presence_indices.sql
@@ -0,0 +1,2 @@
+
+CREATE INDEX IF NOT EXISTS presence_list_user_id ON presence_list (user_id);
diff --git a/synapse/storage/schema/full_schemas/11/event_edges.sql b/synapse/storage/schema/full_schemas/11/event_edges.sql
index 1e766d6db2..124c9a9bdf 100644
--- a/synapse/storage/schema/full_schemas/11/event_edges.sql
+++ b/synapse/storage/schema/full_schemas/11/event_edges.sql
@@ -14,63 +14,63 @@
  */
 
 CREATE TABLE IF NOT EXISTS event_forward_extremities(
-    event_id TEXT NOT NULL,
-    room_id TEXT NOT NULL,
-    CONSTRAINT uniqueness UNIQUE (event_id, room_id) ON CONFLICT REPLACE
-);
+    event_id VARCHAR(150) NOT NULL,
+    room_id VARCHAR(150) NOT NULL,
+    UNIQUE (event_id, room_id)
+) ;
 
 CREATE INDEX IF NOT EXISTS ev_extrem_room ON event_forward_extremities(room_id);
 CREATE INDEX IF NOT EXISTS ev_extrem_id ON event_forward_extremities(event_id);
 
 
 CREATE TABLE IF NOT EXISTS event_backward_extremities(
-    event_id TEXT NOT NULL,
-    room_id TEXT NOT NULL,
-    CONSTRAINT uniqueness UNIQUE (event_id, room_id) ON CONFLICT REPLACE
-);
+    event_id VARCHAR(150) NOT NULL,
+    room_id VARCHAR(150) NOT NULL,
+    UNIQUE (event_id, room_id)
+) ;
 
 CREATE INDEX IF NOT EXISTS ev_b_extrem_room ON event_backward_extremities(room_id);
 CREATE INDEX IF NOT EXISTS ev_b_extrem_id ON event_backward_extremities(event_id);
 
 
 CREATE TABLE IF NOT EXISTS event_edges(
-    event_id TEXT NOT NULL,
-    prev_event_id TEXT NOT NULL,
-    room_id TEXT NOT NULL,
-    is_state INTEGER NOT NULL,
-    CONSTRAINT uniqueness UNIQUE (event_id, prev_event_id, room_id, is_state)
-);
+    event_id VARCHAR(150) NOT NULL,
+    prev_event_id VARCHAR(150) NOT NULL,
+    room_id VARCHAR(150) NOT NULL,
+    is_state BOOL NOT NULL,
+    UNIQUE (event_id, prev_event_id, room_id, is_state)
+) ;
 
 CREATE INDEX IF NOT EXISTS ev_edges_id ON event_edges(event_id);
 CREATE INDEX IF NOT EXISTS ev_edges_prev_id ON event_edges(prev_event_id);
 
 
 CREATE TABLE IF NOT EXISTS room_depth(
-    room_id TEXT NOT NULL,
+    room_id VARCHAR(150) NOT NULL,
     min_depth INTEGER NOT NULL,
-    CONSTRAINT uniqueness UNIQUE (room_id)
-);
+    UNIQUE (room_id)
+) ;
 
 CREATE INDEX IF NOT EXISTS room_depth_room ON room_depth(room_id);
 
 
 create TABLE IF NOT EXISTS event_destinations(
-    event_id TEXT NOT NULL,
-    destination TEXT NOT NULL,
-    delivered_ts INTEGER DEFAULT 0, -- or 0 if not delivered
-    CONSTRAINT uniqueness UNIQUE (event_id, destination) ON CONFLICT REPLACE
-);
+    event_id VARCHAR(150) NOT NULL,
+    destination VARCHAR(150) NOT NULL,
+    delivered_ts BIGINT DEFAULT 0, -- or 0 if not delivered
+    UNIQUE (event_id, destination)
+) ;
 
 CREATE INDEX IF NOT EXISTS event_destinations_id ON event_destinations(event_id);
 
 
 CREATE TABLE IF NOT EXISTS state_forward_extremities(
-    event_id TEXT NOT NULL,
-    room_id TEXT NOT NULL,
-    type TEXT NOT NULL,
-    state_key TEXT NOT NULL,
-    CONSTRAINT uniqueness UNIQUE (event_id, room_id) ON CONFLICT REPLACE
-);
+    event_id VARCHAR(150) NOT NULL,
+    room_id VARCHAR(150) NOT NULL,
+    type VARCHAR(150) NOT NULL,
+    state_key VARCHAR(150) NOT NULL,
+    UNIQUE (event_id, room_id)
+) ;
 
 CREATE INDEX IF NOT EXISTS st_extrem_keys ON state_forward_extremities(
     room_id, type, state_key
@@ -79,11 +79,11 @@ CREATE INDEX IF NOT EXISTS st_extrem_id ON state_forward_extremities(event_id);
 
 
 CREATE TABLE IF NOT EXISTS event_auth(
-    event_id TEXT NOT NULL,
-    auth_id TEXT NOT NULL,
-    room_id TEXT NOT NULL,
-    CONSTRAINT uniqueness UNIQUE (event_id, auth_id, room_id)
-);
+    event_id VARCHAR(150) NOT NULL,
+    auth_id VARCHAR(150) NOT NULL,
+    room_id VARCHAR(150) NOT NULL,
+    UNIQUE (event_id, auth_id, room_id)
+) ;
 
 CREATE INDEX IF NOT EXISTS evauth_edges_id ON event_auth(event_id);
-CREATE INDEX IF NOT EXISTS evauth_edges_auth_id ON event_auth(auth_id);
\ No newline at end of file
+CREATE INDEX IF NOT EXISTS evauth_edges_auth_id ON event_auth(auth_id);
diff --git a/synapse/storage/schema/full_schemas/11/event_signatures.sql b/synapse/storage/schema/full_schemas/11/event_signatures.sql
index c28c39c48a..30e3f71c5f 100644
--- a/synapse/storage/schema/full_schemas/11/event_signatures.sql
+++ b/synapse/storage/schema/full_schemas/11/event_signatures.sql
@@ -14,52 +14,42 @@
  */
 
 CREATE TABLE IF NOT EXISTS event_content_hashes (
-    event_id TEXT,
-    algorithm TEXT,
+    event_id VARCHAR(150),
+    algorithm VARCHAR(150),
     hash BLOB,
-    CONSTRAINT uniqueness UNIQUE (event_id, algorithm)
-);
+    UNIQUE (event_id, algorithm)
+) ;
 
-CREATE INDEX IF NOT EXISTS event_content_hashes_id ON event_content_hashes(
-    event_id
-);
+CREATE INDEX IF NOT EXISTS event_content_hashes_id ON event_content_hashes(event_id);
 
 
 CREATE TABLE IF NOT EXISTS event_reference_hashes (
-    event_id TEXT,
-    algorithm TEXT,
+    event_id VARCHAR(150),
+    algorithm VARCHAR(150),
     hash BLOB,
-    CONSTRAINT uniqueness UNIQUE (event_id, algorithm)
-);
+    UNIQUE (event_id, algorithm)
+) ;
 
-CREATE INDEX IF NOT EXISTS event_reference_hashes_id ON event_reference_hashes (
-    event_id
-);
+CREATE INDEX IF NOT EXISTS event_reference_hashes_id ON event_reference_hashes(event_id);
 
 
 CREATE TABLE IF NOT EXISTS event_signatures (
-    event_id TEXT,
-    signature_name TEXT,
-    key_id TEXT,
+    event_id VARCHAR(150),
+    signature_name VARCHAR(150),
+    key_id VARCHAR(150),
     signature BLOB,
-    CONSTRAINT uniqueness UNIQUE (event_id, signature_name, key_id)
-);
+    UNIQUE (event_id, signature_name, key_id)
+) ;
 
-CREATE INDEX IF NOT EXISTS event_signatures_id ON event_signatures (
-    event_id
-);
+CREATE INDEX IF NOT EXISTS event_signatures_id ON event_signatures(event_id);
 
 
 CREATE TABLE IF NOT EXISTS event_edge_hashes(
-    event_id TEXT,
-    prev_event_id TEXT,
-    algorithm TEXT,
+    event_id VARCHAR(150),
+    prev_event_id VARCHAR(150),
+    algorithm VARCHAR(150),
     hash BLOB,
-    CONSTRAINT uniqueness UNIQUE (
-        event_id, prev_event_id, algorithm
-    )
-);
+    UNIQUE (event_id, prev_event_id, algorithm)
+) ;
 
-CREATE INDEX IF NOT EXISTS event_edge_hashes_id ON event_edge_hashes(
-    event_id
-);
+CREATE INDEX IF NOT EXISTS event_edge_hashes_id ON event_edge_hashes(event_id);
diff --git a/synapse/storage/schema/full_schemas/11/im.sql b/synapse/storage/schema/full_schemas/11/im.sql
index dd00c1cd2f..7cb8f802e1 100644
--- a/synapse/storage/schema/full_schemas/11/im.sql
+++ b/synapse/storage/schema/full_schemas/11/im.sql
@@ -14,112 +14,111 @@
  */
 
 CREATE TABLE IF NOT EXISTS events(
-    stream_ordering INTEGER PRIMARY KEY AUTOINCREMENT,
-    topological_ordering INTEGER NOT NULL,
-    event_id TEXT NOT NULL,
-    type TEXT NOT NULL,
-    room_id TEXT NOT NULL,
-    content TEXT NOT NULL,
-    unrecognized_keys TEXT,
+    stream_ordering BIGINT PRIMARY KEY,
+    topological_ordering BIGINT NOT NULL,
+    event_id VARCHAR(150) NOT NULL,
+    type VARCHAR(150) NOT NULL,
+    room_id VARCHAR(150) NOT NULL,
+    content BLOB NOT NULL,
+    unrecognized_keys BLOB,
     processed BOOL NOT NULL,
     outlier BOOL NOT NULL,
-    depth INTEGER DEFAULT 0 NOT NULL,
-    CONSTRAINT ev_uniq UNIQUE (event_id)
-);
+    depth BIGINT DEFAULT 0 NOT NULL,
+    UNIQUE (event_id)
+) ;
 
-CREATE INDEX IF NOT EXISTS events_event_id ON events (event_id);
 CREATE INDEX IF NOT EXISTS events_stream_ordering ON events (stream_ordering);
 CREATE INDEX IF NOT EXISTS events_topological_ordering ON events (topological_ordering);
 CREATE INDEX IF NOT EXISTS events_room_id ON events (room_id);
 
 
 CREATE TABLE IF NOT EXISTS event_json(
-    event_id TEXT NOT NULL,
-    room_id TEXT NOT NULL,
-    internal_metadata NOT NULL,
+    event_id VARCHAR(150) NOT NULL,
+    room_id VARCHAR(150) NOT NULL,
+    internal_metadata BLOB NOT NULL,
     json BLOB NOT NULL,
-    CONSTRAINT ev_j_uniq UNIQUE (event_id)
-);
+    UNIQUE (event_id)
+) ;
 
-CREATE INDEX IF NOT EXISTS event_json_id ON event_json(event_id);
 CREATE INDEX IF NOT EXISTS event_json_room_id ON event_json(room_id);
 
 
 CREATE TABLE IF NOT EXISTS state_events(
-    event_id TEXT NOT NULL,
-    room_id TEXT NOT NULL,
-    type TEXT NOT NULL,
-    state_key TEXT NOT NULL,
-    prev_state TEXT
-);
+    event_id VARCHAR(150) NOT NULL,
+    room_id VARCHAR(150) NOT NULL,
+    type VARCHAR(150) NOT NULL,
+    state_key VARCHAR(150) NOT NULL,
+    prev_state VARCHAR(150),
+    UNIQUE (event_id)
+) ;
 
-CREATE UNIQUE INDEX IF NOT EXISTS state_events_event_id ON state_events (event_id);
 CREATE INDEX IF NOT EXISTS state_events_room_id ON state_events (room_id);
 CREATE INDEX IF NOT EXISTS state_events_type ON state_events (type);
 CREATE INDEX IF NOT EXISTS state_events_state_key ON state_events (state_key);
 
 
 CREATE TABLE IF NOT EXISTS current_state_events(
-    event_id TEXT NOT NULL,
-    room_id TEXT NOT NULL,
-    type TEXT NOT NULL,
-    state_key TEXT NOT NULL,
-    CONSTRAINT curr_uniq UNIQUE (room_id, type, state_key) ON CONFLICT REPLACE
-);
+    event_id VARCHAR(150) NOT NULL,
+    room_id VARCHAR(150) NOT NULL,
+    type VARCHAR(150) NOT NULL,
+    state_key VARCHAR(150) NOT NULL,
+    UNIQUE (event_id),
+    UNIQUE (room_id, type, state_key)
+) ;
 
-CREATE INDEX IF NOT EXISTS curr_events_event_id ON current_state_events (event_id);
 CREATE INDEX IF NOT EXISTS current_state_events_room_id ON current_state_events (room_id);
 CREATE INDEX IF NOT EXISTS current_state_events_type ON current_state_events (type);
 CREATE INDEX IF NOT EXISTS current_state_events_state_key ON current_state_events (state_key);
 
 CREATE TABLE IF NOT EXISTS room_memberships(
-    event_id TEXT NOT NULL,
-    user_id TEXT NOT NULL,
-    sender TEXT NOT NULL,
-    room_id TEXT NOT NULL,
-    membership TEXT NOT NULL
-);
+    event_id VARCHAR(150) NOT NULL,
+    user_id VARCHAR(150) NOT NULL,
+    sender VARCHAR(150) NOT NULL,
+    room_id VARCHAR(150) NOT NULL,
+    membership VARCHAR(150) NOT NULL,
+    UNIQUE (event_id)
+) ;
 
-CREATE INDEX IF NOT EXISTS room_memberships_event_id ON room_memberships (event_id);
 CREATE INDEX IF NOT EXISTS room_memberships_room_id ON room_memberships (room_id);
 CREATE INDEX IF NOT EXISTS room_memberships_user_id ON room_memberships (user_id);
 
 CREATE TABLE IF NOT EXISTS feedback(
-    event_id TEXT NOT NULL,
-    feedback_type TEXT,
-    target_event_id TEXT,
-    sender TEXT,
-    room_id TEXT
-);
+    event_id VARCHAR(150) NOT NULL,
+    feedback_type VARCHAR(150),
+    target_event_id VARCHAR(150),
+    sender VARCHAR(150),
+    room_id VARCHAR(150),
+    UNIQUE (event_id)
+) ;
 
 CREATE TABLE IF NOT EXISTS topics(
-    event_id TEXT NOT NULL,
-    room_id TEXT NOT NULL,
-    topic TEXT NOT NULL
-);
+    event_id VARCHAR(150) NOT NULL,
+    room_id VARCHAR(150) NOT NULL,
+    topic VARCHAR(150) NOT NULL,
+    UNIQUE (event_id)
+) ;
 
-CREATE INDEX IF NOT EXISTS topics_event_id ON topics(event_id);
 CREATE INDEX IF NOT EXISTS topics_room_id ON topics(room_id);
 
 CREATE TABLE IF NOT EXISTS room_names(
-    event_id TEXT NOT NULL,
-    room_id TEXT NOT NULL,
-    name TEXT NOT NULL
+    event_id VARCHAR(150) NOT NULL,
+    room_id VARCHAR(150) NOT NULL,
+    name VARCHAR(150) NOT NULL,
+    UNIQUE (event_id)
 );
 
-CREATE INDEX IF NOT EXISTS room_names_event_id ON room_names(event_id);
 CREATE INDEX IF NOT EXISTS room_names_room_id ON room_names(room_id);
 
 CREATE TABLE IF NOT EXISTS rooms(
-    room_id TEXT PRIMARY KEY NOT NULL,
-    is_public INTEGER,
-    creator TEXT
-);
+    room_id VARCHAR(150) PRIMARY KEY NOT NULL,
+    is_public BOOL,
+    creator VARCHAR(150)
+) ;
 
 CREATE TABLE IF NOT EXISTS room_hosts(
-    room_id TEXT NOT NULL,
-    host TEXT NOT NULL,
-    CONSTRAINT room_hosts_uniq UNIQUE (room_id, host) ON CONFLICT IGNORE
-);
+    room_id VARCHAR(150) NOT NULL,
+    host VARCHAR(150) NOT NULL,
+    UNIQUE (room_id, host)
+) ;
 
 CREATE INDEX IF NOT EXISTS room_hosts_room_id ON room_hosts (room_id);
diff --git a/synapse/storage/schema/full_schemas/11/keys.sql b/synapse/storage/schema/full_schemas/11/keys.sql
index a9e0a4fe0d..062ca53fef 100644
--- a/synapse/storage/schema/full_schemas/11/keys.sql
+++ b/synapse/storage/schema/full_schemas/11/keys.sql
@@ -13,19 +13,19 @@
  * limitations under the License.
  */
 CREATE TABLE IF NOT EXISTS server_tls_certificates(
-  server_name TEXT, -- Server name.
-  fingerprint TEXT, -- Certificate fingerprint.
-  from_server TEXT, -- Which key server the certificate was fetched from.
-  ts_added_ms INTEGER, -- When the certifcate was added.
+  server_name VARCHAR(150), -- Server name.
+  fingerprint VARCHAR(150), -- Certificate fingerprint.
+  from_server VARCHAR(150), -- Which key server the certificate was fetched from.
+  ts_added_ms BIGINT, -- When the certifcate was added.
   tls_certificate BLOB, -- DER encoded x509 certificate.
-  CONSTRAINT uniqueness UNIQUE (server_name, fingerprint)
-);
+  UNIQUE (server_name, fingerprint)
+) ;
 
 CREATE TABLE IF NOT EXISTS server_signature_keys(
-  server_name TEXT, -- Server name.
-  key_id TEXT, -- Key version.
-  from_server TEXT, -- Which key server the key was fetched form.
-  ts_added_ms INTEGER, -- When the key was added.
+  server_name VARCHAR(150), -- Server name.
+  key_id VARCHAR(150), -- Key version.
+  from_server VARCHAR(150), -- Which key server the key was fetched form.
+  ts_added_ms BIGINT, -- When the key was added.
   verify_key BLOB, -- NACL verification key.
-  CONSTRAINT uniqueness UNIQUE (server_name, key_id)
-);
+  UNIQUE (server_name, key_id)
+) ;
diff --git a/synapse/storage/schema/full_schemas/11/media_repository.sql b/synapse/storage/schema/full_schemas/11/media_repository.sql
index afdf48cbfb..c8c5f1d2f0 100644
--- a/synapse/storage/schema/full_schemas/11/media_repository.sql
+++ b/synapse/storage/schema/full_schemas/11/media_repository.sql
@@ -14,55 +14,55 @@
  */
 
 CREATE TABLE IF NOT EXISTS local_media_repository (
-    media_id TEXT, -- The id used to refer to the media.
-    media_type TEXT, -- The MIME-type of the media.
+    media_id VARCHAR(150), -- The id used to refer to the media.
+    media_type VARCHAR(150), -- The MIME-type of the media.
     media_length INTEGER, -- Length of the media in bytes.
-    created_ts INTEGER, -- When the content was uploaded in ms.
-    upload_name TEXT, -- The name the media was uploaded with.
-    user_id TEXT, -- The user who uploaded the file.
-    CONSTRAINT uniqueness UNIQUE (media_id)
-);
+    created_ts BIGINT, -- When the content was uploaded in ms.
+    upload_name VARCHAR(150), -- The name the media was uploaded with.
+    user_id VARCHAR(150), -- The user who uploaded the file.
+    UNIQUE (media_id)
+) ;
 
 CREATE TABLE IF NOT EXISTS local_media_repository_thumbnails (
-    media_id TEXT, -- The id used to refer to the media.
+    media_id VARCHAR(150), -- The id used to refer to the media.
     thumbnail_width INTEGER, -- The width of the thumbnail in pixels.
     thumbnail_height INTEGER, -- The height of the thumbnail in pixels.
-    thumbnail_type TEXT, -- The MIME-type of the thumbnail.
-    thumbnail_method TEXT, -- The method used to make the thumbnail.
+    thumbnail_type VARCHAR(150), -- The MIME-type of the thumbnail.
+    thumbnail_method VARCHAR(150), -- The method used to make the thumbnail.
     thumbnail_length INTEGER, -- The length of the thumbnail in bytes.
-    CONSTRAINT uniqueness UNIQUE (
+    UNIQUE (
         media_id, thumbnail_width, thumbnail_height, thumbnail_type
     )
-);
+) ;
 
 CREATE INDEX IF NOT EXISTS local_media_repository_thumbnails_media_id
     ON local_media_repository_thumbnails (media_id);
 
 CREATE TABLE IF NOT EXISTS remote_media_cache (
-    media_origin TEXT, -- The remote HS the media came from.
-    media_id TEXT, -- The id used to refer to the media on that server.
-    media_type TEXT, -- The MIME-type of the media.
-    created_ts INTEGER, -- When the content was uploaded in ms.
-    upload_name TEXT, -- The name the media was uploaded with.
+    media_origin VARCHAR(150), -- The remote HS the media came from.
+    media_id VARCHAR(150), -- The id used to refer to the media on that server.
+    media_type VARCHAR(150), -- The MIME-type of the media.
+    created_ts BIGINT, -- When the content was uploaded in ms.
+    upload_name VARCHAR(150), -- The name the media was uploaded with.
     media_length INTEGER, -- Length of the media in bytes.
-    filesystem_id TEXT, -- The name used to store the media on disk.
-    CONSTRAINT uniqueness UNIQUE (media_origin, media_id)
-);
+    filesystem_id VARCHAR(150), -- The name used to store the media on disk.
+    UNIQUE (media_origin, media_id)
+) ;
 
 CREATE TABLE IF NOT EXISTS remote_media_cache_thumbnails (
-    media_origin TEXT, -- The remote HS the media came from.
-    media_id TEXT, -- The id used to refer to the media.
+    media_origin VARCHAR(150), -- The remote HS the media came from.
+    media_id VARCHAR(150), -- The id used to refer to the media.
     thumbnail_width INTEGER, -- The width of the thumbnail in pixels.
     thumbnail_height INTEGER, -- The height of the thumbnail in pixels.
-    thumbnail_method TEXT, -- The method used to make the thumbnail
-    thumbnail_type TEXT, -- The MIME-type of the thumbnail.
+    thumbnail_method VARCHAR(150), -- The method used to make the thumbnail
+    thumbnail_type VARCHAR(150), -- The MIME-type of the thumbnail.
     thumbnail_length INTEGER, -- The length of the thumbnail in bytes.
-    filesystem_id TEXT, -- The name used to store the media on disk.
-    CONSTRAINT uniqueness UNIQUE (
+    filesystem_id VARCHAR(150), -- The name used to store the media on disk.
+    UNIQUE (
         media_origin, media_id, thumbnail_width, thumbnail_height,
-        thumbnail_type, thumbnail_type
-    )
-);
+        thumbnail_type
+     )
+) ;
 
 CREATE INDEX IF NOT EXISTS remote_media_cache_thumbnails_media_id
-    ON local_media_repository_thumbnails (media_id);
+    ON remote_media_cache_thumbnails (media_id);
diff --git a/synapse/storage/schema/full_schemas/11/presence.sql b/synapse/storage/schema/full_schemas/11/presence.sql
index f9f8db9697..00d803a5cd 100644
--- a/synapse/storage/schema/full_schemas/11/presence.sql
+++ b/synapse/storage/schema/full_schemas/11/presence.sql
@@ -13,26 +13,26 @@
  * limitations under the License.
  */
 CREATE TABLE IF NOT EXISTS presence(
-  user_id INTEGER NOT NULL,
-  state INTEGER,
-  status_msg TEXT,
-  mtime INTEGER, -- miliseconds since last state change
-  FOREIGN KEY(user_id) REFERENCES users(id)
-);
+  user_id VARCHAR(150) NOT NULL,
+  state VARCHAR(20),
+  status_msg VARCHAR(150),
+  mtime BIGINT, -- miliseconds since last state change
+  UNIQUE (user_id)
+) ;
 
 -- For each of /my/ users which possibly-remote users are allowed to see their
 -- presence state
 CREATE TABLE IF NOT EXISTS presence_allow_inbound(
-  observed_user_id INTEGER NOT NULL,
-  observer_user_id TEXT, -- a UserID,
-  FOREIGN KEY(observed_user_id) REFERENCES users(id)
-);
+  observed_user_id VARCHAR(150) NOT NULL,
+  observer_user_id VARCHAR(150) NOT NULL, -- a UserID,
+  UNIQUE (observed_user_id, observer_user_id)
+) ;
 
 -- For each of /my/ users (watcher), which possibly-remote users are they
 -- watching?
 CREATE TABLE IF NOT EXISTS presence_list(
-  user_id INTEGER NOT NULL,
-  observed_user_id TEXT, -- a UserID,
-  accepted BOOLEAN,
-  FOREIGN KEY(user_id) REFERENCES users(id)
-);
+  user_id VARCHAR(150) NOT NULL,
+  observed_user_id VARCHAR(150) NOT NULL, -- a UserID,
+  accepted BOOLEAN NOT NULL,
+  UNIQUE (user_id, observed_user_id)
+) ;
diff --git a/synapse/storage/schema/full_schemas/11/profiles.sql b/synapse/storage/schema/full_schemas/11/profiles.sql
index f06a528b4d..023060a54c 100644
--- a/synapse/storage/schema/full_schemas/11/profiles.sql
+++ b/synapse/storage/schema/full_schemas/11/profiles.sql
@@ -13,8 +13,8 @@
  * limitations under the License.
  */
 CREATE TABLE IF NOT EXISTS profiles(
-    user_id INTEGER NOT NULL,
-    displayname TEXT,
-    avatar_url TEXT,
-    FOREIGN KEY(user_id) REFERENCES users(id)
-);
+    user_id VARCHAR(150) NOT NULL,
+    displayname VARCHAR(150),
+    avatar_url VARCHAR(150),
+    UNIQUE(user_id)
+) ;
diff --git a/synapse/storage/schema/full_schemas/11/redactions.sql b/synapse/storage/schema/full_schemas/11/redactions.sql
index 5011d95db8..5c23188d62 100644
--- a/synapse/storage/schema/full_schemas/11/redactions.sql
+++ b/synapse/storage/schema/full_schemas/11/redactions.sql
@@ -13,10 +13,10 @@
  * limitations under the License.
  */
 CREATE TABLE IF NOT EXISTS redactions (
-    event_id TEXT NOT NULL,
-    redacts TEXT NOT NULL,
-    CONSTRAINT ev_uniq UNIQUE (event_id)
-);
+    event_id VARCHAR(150) NOT NULL,
+    redacts VARCHAR(150) NOT NULL,
+    UNIQUE (event_id)
+) ;
 
 CREATE INDEX IF NOT EXISTS redactions_event_id ON redactions (event_id);
 CREATE INDEX IF NOT EXISTS redactions_redacts ON redactions (redacts);
diff --git a/synapse/storage/schema/full_schemas/11/room_aliases.sql b/synapse/storage/schema/full_schemas/11/room_aliases.sql
index 0d2df01603..63fe0f5c64 100644
--- a/synapse/storage/schema/full_schemas/11/room_aliases.sql
+++ b/synapse/storage/schema/full_schemas/11/room_aliases.sql
@@ -14,14 +14,12 @@
  */
 
 CREATE TABLE IF NOT EXISTS room_aliases(
-    room_alias TEXT NOT NULL,
-    room_id TEXT NOT NULL
-);
+    room_alias VARCHAR(150) NOT NULL,
+    room_id VARCHAR(150) NOT NULL,
+    UNIQUE (room_alias)
+) ;
 
 CREATE TABLE IF NOT EXISTS room_alias_servers(
-    room_alias TEXT NOT NULL,
-    server TEXT NOT NULL
-);
-
-
-
+    room_alias VARCHAR(150) NOT NULL,
+    server VARCHAR(150) NOT NULL
+) ;
diff --git a/synapse/storage/schema/full_schemas/11/state.sql b/synapse/storage/schema/full_schemas/11/state.sql
index 1fe8f1e430..acfb76439b 100644
--- a/synapse/storage/schema/full_schemas/11/state.sql
+++ b/synapse/storage/schema/full_schemas/11/state.sql
@@ -14,34 +14,27 @@
  */
 
 CREATE TABLE IF NOT EXISTS state_groups(
-    id INTEGER PRIMARY KEY,
-    room_id TEXT NOT NULL,
-    event_id TEXT NOT NULL
-);
+    id VARCHAR(20) PRIMARY KEY,
+    room_id VARCHAR(150) NOT NULL,
+    event_id VARCHAR(150) NOT NULL
+) ;
 
 CREATE TABLE IF NOT EXISTS state_groups_state(
-    state_group INTEGER NOT NULL,
-    room_id TEXT NOT NULL,
-    type TEXT NOT NULL,
-    state_key TEXT NOT NULL,
-    event_id TEXT NOT NULL
-);
+    state_group VARCHAR(20) NOT NULL,
+    room_id VARCHAR(150) NOT NULL,
+    type VARCHAR(150) NOT NULL,
+    state_key VARCHAR(150) NOT NULL,
+    event_id VARCHAR(150) NOT NULL
+) ;
 
 CREATE TABLE IF NOT EXISTS event_to_state_groups(
-    event_id TEXT NOT NULL,
-    state_group INTEGER NOT NULL,
-    CONSTRAINT event_to_state_groups_uniq UNIQUE (event_id)
-);
+    event_id VARCHAR(150) NOT NULL,
+    state_group VARCHAR(150) NOT NULL,
+    UNIQUE (event_id)
+) ;
 
 CREATE INDEX IF NOT EXISTS state_groups_id ON state_groups(id);
 
-CREATE INDEX IF NOT EXISTS state_groups_state_id ON state_groups_state(
-    state_group
-);
-CREATE INDEX IF NOT EXISTS state_groups_state_tuple ON state_groups_state(
-    room_id, type, state_key
-);
-
-CREATE INDEX IF NOT EXISTS event_to_state_groups_id ON event_to_state_groups(
-    event_id
-);
\ No newline at end of file
+CREATE INDEX IF NOT EXISTS state_groups_state_id ON state_groups_state(state_group);
+CREATE INDEX IF NOT EXISTS state_groups_state_tuple ON state_groups_state(room_id, type, state_key);
+CREATE INDEX IF NOT EXISTS event_to_state_groups_id ON event_to_state_groups(event_id);
\ No newline at end of file
diff --git a/synapse/storage/schema/full_schemas/11/transactions.sql b/synapse/storage/schema/full_schemas/11/transactions.sql
index 2d30f99b06..43541661ce 100644
--- a/synapse/storage/schema/full_schemas/11/transactions.sql
+++ b/synapse/storage/schema/full_schemas/11/transactions.sql
@@ -14,34 +14,30 @@
  */
 -- Stores what transaction ids we have received and what our response was
 CREATE TABLE IF NOT EXISTS received_transactions(
-    transaction_id TEXT, 
-    origin TEXT, 
-    ts INTEGER,
+    transaction_id VARCHAR(150),
+    origin VARCHAR(150),
+    ts BIGINT,
     response_code INTEGER,
-    response_json TEXT,
+    response_json BLOB,
     has_been_referenced BOOL default 0, -- Whether thishas been referenced by a prev_tx
-    CONSTRAINT uniquesss UNIQUE (transaction_id, origin) ON CONFLICT REPLACE
-);
+    UNIQUE (transaction_id, origin)
+) ;
 
-CREATE UNIQUE INDEX IF NOT EXISTS transactions_txid ON received_transactions(transaction_id, origin);
 CREATE INDEX IF NOT EXISTS transactions_have_ref ON received_transactions(origin, has_been_referenced);-- WHERE has_been_referenced = 0;
 
 
 -- Stores what transactions we've sent, what their response was (if we got one) and whether we have
 -- since referenced the transaction in another outgoing transaction
 CREATE TABLE IF NOT EXISTS sent_transactions(
-    id INTEGER PRIMARY KEY AUTOINCREMENT, -- This is used to apply insertion ordering
-    transaction_id TEXT,
-    destination TEXT,
+    id BIGINT PRIMARY KEY, -- This is used to apply insertion ordering
+    transaction_id VARCHAR(150),
+    destination VARCHAR(150),
     response_code INTEGER DEFAULT 0,
-    response_json TEXT,
-    ts INTEGER
-);
+    response_json BLOB,
+    ts BIGINT
+) ;
 
 CREATE INDEX IF NOT EXISTS sent_transaction_dest ON sent_transactions(destination);
-CREATE INDEX IF NOT EXISTS sent_transaction_dest_referenced ON sent_transactions(
-    destination
-);
 CREATE INDEX IF NOT EXISTS sent_transaction_txn_id ON sent_transactions(transaction_id);
 -- So that we can do an efficient look up of all transactions that have yet to be successfully
 -- sent.
@@ -51,18 +47,17 @@ CREATE INDEX IF NOT EXISTS sent_transaction_sent ON sent_transactions(response_c
 -- For sent transactions only.
 CREATE TABLE IF NOT EXISTS transaction_id_to_pdu(
     transaction_id INTEGER,
-    destination TEXT,
-    pdu_id TEXT,
-    pdu_origin TEXT
-);
+    destination VARCHAR(150),
+    pdu_id VARCHAR(150),
+    pdu_origin VARCHAR(150),
+    UNIQUE (transaction_id, destination)
+) ;
 
-CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_tx ON transaction_id_to_pdu(transaction_id, destination);
 CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_dest ON transaction_id_to_pdu(destination);
-CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_index ON transaction_id_to_pdu(transaction_id, destination);
 
 -- To track destination health
 CREATE TABLE IF NOT EXISTS destinations(
-    destination TEXT PRIMARY KEY,
-    retry_last_ts INTEGER,
+    destination VARCHAR(150) PRIMARY KEY,
+    retry_last_ts BIGINT,
     retry_interval INTEGER
-);
+) ;
diff --git a/synapse/storage/schema/full_schemas/11/users.sql b/synapse/storage/schema/full_schemas/11/users.sql
index 08ccfdac0a..ba0f42d455 100644
--- a/synapse/storage/schema/full_schemas/11/users.sql
+++ b/synapse/storage/schema/full_schemas/11/users.sql
@@ -13,33 +13,30 @@
  * limitations under the License.
  */
 CREATE TABLE IF NOT EXISTS users(
-    id INTEGER PRIMARY KEY AUTOINCREMENT,
-    name TEXT,
-    password_hash TEXT,
-    creation_ts INTEGER,
+    name VARCHAR(150),
+    password_hash VARCHAR(150),
+    creation_ts BIGINT,
     admin BOOL DEFAULT 0 NOT NULL,
-    UNIQUE(name) ON CONFLICT ROLLBACK
-);
+    UNIQUE(name)
+) ;
 
 CREATE TABLE IF NOT EXISTS access_tokens(
-    id INTEGER PRIMARY KEY AUTOINCREMENT,
-    user_id INTEGER NOT NULL,
-    device_id TEXT,
-    token TEXT NOT NULL,
-    last_used INTEGER,
-    FOREIGN KEY(user_id) REFERENCES users(id),
-    UNIQUE(token) ON CONFLICT ROLLBACK
-);
+    id BIGINT PRIMARY KEY,
+    user_id VARCHAR(150) NOT NULL,
+    device_id VARCHAR(150),
+    token VARCHAR(150) NOT NULL,
+    last_used BIGINT,
+    UNIQUE(token)
+) ;
 
 CREATE TABLE IF NOT EXISTS user_ips (
-    user TEXT NOT NULL,
-    access_token TEXT NOT NULL,
-    device_id TEXT,
-    ip TEXT NOT NULL,
-    user_agent TEXT NOT NULL,
-    last_seen INTEGER NOT NULL,
-    CONSTRAINT user_ip UNIQUE (user, access_token, ip, user_agent) ON CONFLICT REPLACE
-);
+    user VARCHAR(150) NOT NULL,
+    access_token VARCHAR(150) NOT NULL,
+    device_id VARCHAR(150),
+    ip VARCHAR(150) NOT NULL,
+    user_agent VARCHAR(150) NOT NULL,
+    last_seen BIGINT NOT NULL
+) ;
 
 CREATE INDEX IF NOT EXISTS user_ips_user ON user_ips(user);
-
+CREATE INDEX IF NOT EXISTS user_ips_user_ip ON user_ips(user, access_token, ip);
diff --git a/synapse/storage/schema/schema_version.sql b/synapse/storage/schema/schema_version.sql
index 0431e2d051..e7fa6fe569 100644
--- a/synapse/storage/schema/schema_version.sql
+++ b/synapse/storage/schema/schema_version.sql
@@ -14,17 +14,16 @@
  */
 
 CREATE TABLE IF NOT EXISTS schema_version(
-    Lock char(1) NOT NULL DEFAULT 'X',  -- Makes sure this table only has one row.
-    version INTEGER NOT NULL,
-    upgraded BOOL NOT NULL,  -- Whether we reached this version from an upgrade or an initial schema.
-    CONSTRAINT schema_version_lock_x CHECK (Lock='X')
-    CONSTRAINT schema_version_lock_uniq UNIQUE (Lock)
+    `Lock` CHAR(1) NOT NULL DEFAULT 'X' UNIQUE,  -- Makes sure this table only has one row.
+    `version` INTEGER NOT NULL,
+    `upgraded` BOOL NOT NULL,  -- Whether we reached this version from an upgrade or an initial schema.
+    CHECK (`Lock`='X')
 );
 
 CREATE TABLE IF NOT EXISTS applied_schema_deltas(
-    version INTEGER NOT NULL,
-    file TEXT NOT NULL,
-    CONSTRAINT schema_deltas_ver_file UNIQUE (version, file) ON CONFLICT IGNORE
+    `version` INTEGER NOT NULL,
+    `file` VARCHAR(150) NOT NULL,
+    UNIQUE(version, file)
 );
 
 CREATE INDEX IF NOT EXISTS schema_deltas_ver ON applied_schema_deltas(version);
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index d0d53770f2..f051828630 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -56,7 +56,6 @@ class SignatureStore(SQLBaseStore):
                 "algorithm": algorithm,
                 "hash": buffer(hash_bytes),
             },
-            or_ignore=True,
         )
 
     def get_event_reference_hashes(self, event_ids):
@@ -100,7 +99,7 @@ class SignatureStore(SQLBaseStore):
             " WHERE event_id = ?"
         )
         txn.execute(query, (event_id, ))
-        return dict(txn.fetchall())
+        return {k: v for k, v in txn.fetchall()}
 
     def _store_event_reference_hash_txn(self, txn, event_id, algorithm,
                                         hash_bytes):
@@ -119,7 +118,6 @@ class SignatureStore(SQLBaseStore):
                 "algorithm": algorithm,
                 "hash": buffer(hash_bytes),
             },
-            or_ignore=True,
         )
 
     def _get_event_signatures_txn(self, txn, event_id):
@@ -164,7 +162,6 @@ class SignatureStore(SQLBaseStore):
                 "key_id": key_id,
                 "signature": buffer(signature_bytes),
             },
-            or_ignore=True,
         )
 
     def _get_prev_event_hashes_txn(self, txn, event_id):
@@ -198,5 +195,4 @@ class SignatureStore(SQLBaseStore):
                 "algorithm": algorithm,
                 "hash": buffer(hash_bytes),
             },
-            or_ignore=True,
         )
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 58dbf2802b..4994bacd6c 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -17,6 +17,8 @@ from ._base import SQLBaseStore
 
 from twisted.internet import defer
 
+from synapse.util.stringutils import random_string
+
 import logging
 
 logger = logging.getLogger(__name__)
@@ -91,14 +93,15 @@ class StateStore(SQLBaseStore):
 
         state_group = context.state_group
         if not state_group:
-            state_group = self._simple_insert_txn(
+            state_group = _make_group_id(self._clock)
+            self._simple_insert_txn(
                 txn,
                 table="state_groups",
                 values={
+                    "id": state_group,
                     "room_id": event.room_id,
                     "event_id": event.event_id,
                 },
-                or_ignore=True,
             )
 
             for state in state_events.values():
@@ -112,7 +115,6 @@ class StateStore(SQLBaseStore):
                         "state_key": state.state_key,
                         "event_id": state.event_id,
                     },
-                    or_ignore=True,
                 )
 
         self._simple_insert_txn(
@@ -122,7 +124,6 @@ class StateStore(SQLBaseStore):
                 "state_group": state_group,
                 "event_id": event.event_id,
             },
-            or_replace=True,
         )
 
     @defer.inlineCallbacks
@@ -154,3 +155,7 @@ class StateStore(SQLBaseStore):
 
         events = yield self._parse_events(results)
         defer.returnValue(events)
+
+
+def _make_group_id(clock):
+    return str(int(clock.time_msec())) + random_string(5)
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 66f307e640..9925f04bf7 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -110,7 +110,7 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")):
         if self.topological is None:
             return "(%d < %s)" % (self.stream, "stream_ordering")
         else:
-            return "(%d < %s OR (%d == %s AND %d < %s))" % (
+            return "(%d < %s OR (%d = %s AND %d < %s))" % (
                 self.topological, "topological_ordering",
                 self.topological, "topological_ordering",
                 self.stream, "stream_ordering",
@@ -120,7 +120,7 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")):
         if self.topological is None:
             return "(%d >= %s)" % (self.stream, "stream_ordering")
         else:
-            return "(%d > %s OR (%d == %s AND %d >= %s))" % (
+            return "(%d > %s OR (%d = %s AND %d >= %s))" % (
                 self.topological, "topological_ordering",
                 self.topological, "topological_ordering",
                 self.stream, "stream_ordering",
@@ -413,12 +413,10 @@ class StreamStore(SQLBaseStore):
             "get_recent_events_for_room", get_recent_events_for_room_txn
         )
 
-    @cached(num_args=0)
+    @defer.inlineCallbacks
     def get_room_events_max_id(self):
-        return self.runInteraction(
-            "get_room_events_max_id",
-            self._get_room_events_max_id_txn
-        )
+        token = yield self._stream_id_gen.get_max_token(self)
+        defer.returnValue("s%d" % (token,))
 
     @defer.inlineCallbacks
     def _get_min_token(self):
@@ -433,27 +431,6 @@ class StreamStore(SQLBaseStore):
 
         defer.returnValue(self.min_token)
 
-    def get_next_stream_id(self):
-        with self._next_stream_id_lock:
-            i = self._next_stream_id
-            self._next_stream_id += 1
-            return i
-
-    def _get_room_events_max_id_txn(self, txn):
-        txn.execute(
-            "SELECT MAX(stream_ordering) as m FROM events"
-        )
-
-        res = self.cursor_to_dict(txn)
-
-        logger.debug("get_room_events_max_id: %s", res)
-
-        if not res or not res[0] or not res[0]["m"]:
-            return "s0"
-
-        key = res[0]["m"]
-        return "s%d" % (key,)
-
     @staticmethod
     def _set_before_and_after(events, rows):
         for event, row in zip(events, rows):
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index b777395e06..4c3dc58662 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import SQLBaseStore, Table, cached
+from ._base import SQLBaseStore, cached
 
 from collections import namedtuple
 
@@ -84,13 +84,18 @@ class TransactionStore(SQLBaseStore):
 
     def _set_received_txn_response(self, txn, transaction_id, origin, code,
                                    response_json):
-        query = (
-            "UPDATE %s "
-            "SET response_code = ?, response_json = ? "
-            "WHERE transaction_id = ? AND origin = ?"
-        ) % ReceivedTransactionsTable.table_name
-
-        txn.execute(query, (code, response_json, transaction_id, origin))
+        self._simple_upsert_txn(
+            txn,
+            table=ReceivedTransactionsTable.table_name,
+            keyvalues={
+                "transaction_id": transaction_id,
+                "origin": origin,
+            },
+            values={
+                "response_code": code,
+                "response_json": response_json,
+            }
+        )
 
     def prep_send_transaction(self, transaction_id, destination,
                               origin_server_ts):
@@ -118,41 +123,38 @@ class TransactionStore(SQLBaseStore):
     def _prep_send_transaction(self, txn, transaction_id, destination,
                                origin_server_ts):
 
+        next_id = self._transaction_id_gen.get_next_txn(txn)
+
         # First we find out what the prev_txns should be.
         # Since we know that we are only sending one transaction at a time,
         # we can simply take the last one.
-        query = "%s ORDER BY id DESC LIMIT 1" % (
-                SentTransactions.select_statement("destination = ?"),
-            )
+        query = (
+            "SELECT * FROM sent_transactions"
+            " WHERE destination = ?"
+            " ORDER BY id DESC LIMIT 1"
+        )
 
-        results = txn.execute(query, (destination,))
-        results = SentTransactions.decode_results(results)
+        txn.execute(query, (destination,))
+        results = self.cursor_to_dict(txn)
 
-        prev_txns = [r.transaction_id for r in results]
+        prev_txns = [r["transaction_id"] for r in results]
 
         # Actually add the new transaction to the sent_transactions table.
 
-        query = SentTransactions.insert_statement()
-        txn.execute(query, SentTransactions.EntryType(
-            None,
-            transaction_id=transaction_id,
-            destination=destination,
-            ts=origin_server_ts,
-            response_code=0,
-            response_json=None
-        ))
-
-        # Update the tx id -> pdu id mapping
-
-        # values = [
-        #     (transaction_id, destination, pdu[0], pdu[1])
-        #     for pdu in pdu_list
-        # ]
-        #
-        # logger.debug("Inserting: %s", repr(values))
-        #
-        # query = TransactionsToPduTable.insert_statement()
-        # txn.executemany(query, values)
+        self._simple_insert_txn(
+            txn,
+            table=SentTransactions.table_name,
+            values={
+                "id": next_id,
+                "transaction_id": transaction_id,
+                "destination": destination,
+                "ts": origin_server_ts,
+                "response_code": 0,
+                "response_json": None,
+            }
+        )
+
+        # TODO Update the tx id -> pdu id mapping
 
         return prev_txns
 
@@ -171,15 +173,20 @@ class TransactionStore(SQLBaseStore):
             transaction_id, destination, code, response_dict
         )
 
-    def _delivered_txn(cls, txn, transaction_id, destination,
+    def _delivered_txn(self, txn, transaction_id, destination,
                        code, response_json):
-        query = (
-            "UPDATE %s "
-            "SET response_code = ?, response_json = ? "
-            "WHERE transaction_id = ? AND destination = ?"
-        ) % SentTransactions.table_name
-
-        txn.execute(query, (code, response_json, transaction_id, destination))
+        self._simple_update_one_txn(
+            txn,
+            table=SentTransactions.table_name,
+            keyvalues={
+                "transaction_id": transaction_id,
+                "destination": destination,
+            },
+            updatevalues={
+                "response_code": code,
+                "response_json": response_json,
+            }
+        )
 
     def get_transactions_after(self, transaction_id, destination):
         """Get all transactions after a given local transaction_id.
@@ -189,25 +196,26 @@ class TransactionStore(SQLBaseStore):
             destination (str)
 
         Returns:
-            list: A list of `ReceivedTransactionsTable.EntryType`
+            list: A list of dicts
         """
         return self.runInteraction(
             "get_transactions_after",
             self._get_transactions_after, transaction_id, destination
         )
 
-    def _get_transactions_after(cls, txn, transaction_id, destination):
-        where = (
-            "destination = ? AND id > (select id FROM %s WHERE "
-            "transaction_id = ? AND destination = ?)"
-        ) % (
-            SentTransactions.table_name
+    def _get_transactions_after(self, txn, transaction_id, destination):
+        query = (
+            "SELECT * FROM sent_transactions"
+            " WHERE destination = ? AND id >"
+            " ("
+            " SELECT id FROM sent_transactions"
+            " WHERE transaction_id = ? AND destination = ?"
+            " )"
         )
-        query = SentTransactions.select_statement(where)
 
         txn.execute(query, (destination, transaction_id, destination))
 
-        return ReceivedTransactionsTable.decode_results(txn.fetchall())
+        return self.cursor_to_dict(txn)
 
     @cached()
     def get_destination_retry_timings(self, destination):
@@ -218,22 +226,27 @@ class TransactionStore(SQLBaseStore):
 
         Returns:
             None if not retrying
-            Otherwise a DestinationsTable.EntryType for the retry scheme
+            Otherwise a dict for the retry scheme
         """
         return self.runInteraction(
             "get_destination_retry_timings",
             self._get_destination_retry_timings, destination)
 
-    def _get_destination_retry_timings(cls, txn, destination):
-        query = DestinationsTable.select_statement("destination = ?")
-        txn.execute(query, (destination,))
-        result = txn.fetchall()
-        if result:
-            result = DestinationsTable.decode_single_result(result)
-            if result.retry_last_ts > 0:
-                return result
-            else:
-                return None
+    def _get_destination_retry_timings(self, txn, destination):
+        result = self._simple_select_one_txn(
+            txn,
+            table=DestinationsTable.table_name,
+            keyvalues={
+                "destination": destination,
+            },
+            retcols=DestinationsTable.fields,
+            allow_none=True,
+        )
+
+        if result and result["retry_last_ts"] > 0:
+            return result
+        else:
+            return None
 
     def set_destination_retry_timings(self, destination,
                                       retry_last_ts, retry_interval):
@@ -249,11 +262,11 @@ class TransactionStore(SQLBaseStore):
         # As this is the new value, we might as well prefill the cache
         self.get_destination_retry_timings.prefill(
             destination,
-            DestinationsTable.EntryType(
-                destination,
-                retry_last_ts,
-                retry_interval
-            )
+            {
+                "destination": destination,
+                "retry_last_ts": retry_last_ts,
+                "retry_interval": retry_interval
+            },
         )
 
         # XXX: we could chose to not bother persisting this if our cache thinks
@@ -266,22 +279,38 @@ class TransactionStore(SQLBaseStore):
             retry_interval,
         )
 
-    def _set_destination_retry_timings(cls, txn, destination,
+    def _set_destination_retry_timings(self, txn, destination,
                                        retry_last_ts, retry_interval):
-
         query = (
-            "INSERT OR REPLACE INTO %s "
-            "(destination, retry_last_ts, retry_interval) "
-            "VALUES (?, ?, ?) "
-        ) % DestinationsTable.table_name
+            "UPDATE destinations"
+            " SET retry_last_ts = ?, retry_interval = ?"
+            " WHERE destination = ?"
+        )
+
+        txn.execute(
+            query,
+            (
+                retry_last_ts, retry_interval, destination,
+            )
+        )
 
-        txn.execute(query, (destination, retry_last_ts, retry_interval))
+        if txn.rowcount == 0:
+            # destination wasn't already in table. Insert it.
+            self._simple_insert_txn(
+                txn,
+                table="destinations",
+                values={
+                    "destination": destination,
+                    "retry_last_ts": retry_last_ts,
+                    "retry_interval": retry_interval,
+                }
+            )
 
     def get_destinations_needing_retry(self):
         """Get all destinations which are due a retry for sending a transaction.
 
         Returns:
-            list: A list of `DestinationsTable.EntryType`
+            list: A list of dicts
         """
 
         return self.runInteraction(
@@ -289,14 +318,17 @@ class TransactionStore(SQLBaseStore):
             self._get_destinations_needing_retry
         )
 
-    def _get_destinations_needing_retry(cls, txn):
-        where = "retry_last_ts > 0 and retry_next_ts < now()"
-        query = DestinationsTable.select_statement(where)
-        txn.execute(query)
-        return DestinationsTable.decode_results(txn.fetchall())
+    def _get_destinations_needing_retry(self, txn):
+        query = (
+            "SELECT * FROM destinations"
+            " WHERE retry_last_ts > 0 and retry_next_ts < ?"
+        )
+
+        txn.execute(query, (self._clock.time_msec(),))
+        return self.cursor_to_dict(txn)
 
 
-class ReceivedTransactionsTable(Table):
+class ReceivedTransactionsTable(object):
     table_name = "received_transactions"
 
     fields = [
@@ -308,10 +340,8 @@ class ReceivedTransactionsTable(Table):
         "has_been_referenced",
     ]
 
-    EntryType = namedtuple("ReceivedTransactionsEntry", fields)
 
-
-class SentTransactions(Table):
+class SentTransactions(object):
     table_name = "sent_transactions"
 
     fields = [
@@ -326,7 +356,7 @@ class SentTransactions(Table):
     EntryType = namedtuple("SentTransactionsEntry", fields)
 
 
-class TransactionsToPduTable(Table):
+class TransactionsToPduTable(object):
     table_name = "transaction_id_to_pdu"
 
     fields = [
@@ -336,10 +366,8 @@ class TransactionsToPduTable(Table):
         "pdu_origin",
     ]
 
-    EntryType = namedtuple("TransactionsToPduEntry", fields)
-
 
-class DestinationsTable(Table):
+class DestinationsTable(object):
     table_name = "destinations"
 
     fields = [
@@ -347,5 +375,3 @@ class DestinationsTable(Table):
         "retry_last_ts",
         "retry_interval",
     ]
-
-    EntryType = namedtuple("DestinationsEntry", fields)
diff --git a/synapse/storage/util/__init__.py b/synapse/storage/util/__init__.py
new file mode 100644
index 0000000000..c488b10d3c
--- /dev/null
+++ b/synapse/storage/util/__init__.py
@@ -0,0 +1,14 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014, 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
new file mode 100644
index 0000000000..8f419323a7
--- /dev/null
+++ b/synapse/storage/util/id_generators.py
@@ -0,0 +1,126 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014, 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from collections import deque
+import contextlib
+import threading
+
+
+class IdGenerator(object):
+    def __init__(self, table, column, store):
+        self.table = table
+        self.column = column
+        self.store = store
+        self._lock = threading.Lock()
+        self._next_id = None
+
+    @defer.inlineCallbacks
+    def get_next(self):
+        with self._lock:
+            if not self._next_id:
+                res = yield self.store._execute_and_decode(
+                    "IdGenerator_%s" % (self.table,),
+                    "SELECT MAX(%s) as mx FROM %s" % (self.column, self.table,)
+                )
+
+                self._next_id = (res and res[0] and res[0]["mx"]) or 1
+
+            i = self._next_id
+            self._next_id += 1
+            defer.returnValue(i)
+
+    def get_next_txn(self, txn):
+        with self._lock:
+            if self._next_id:
+                i = self._next_id
+                self._next_id += 1
+                return i
+            else:
+                txn.execute(
+                    "SELECT MAX(%s) FROM %s" % (self.column, self.table,)
+                )
+
+                val, = txn.fetchone()
+                self._next_id = val or 2
+
+                return 1
+
+
+class StreamIdGenerator(object):
+    """Used to generate new stream ids when persisting events while keeping
+    track of which transactions have been completed.
+
+    This allows us to get the "current" stream id, i.e. the stream id such that
+    all ids less than or equal to it have completed. This handles the fact that
+    persistence of events can complete out of order.
+
+    Usage:
+        with stream_id_gen.get_next_txn(txn) as stream_id:
+            # ... persist event ...
+    """
+    def __init__(self):
+        self._lock = threading.Lock()
+
+        self._current_max = None
+        self._unfinished_ids = deque()
+
+    def get_next_txn(self, txn):
+        """
+        Usage:
+            with stream_id_gen.get_next_txn(txn) as stream_id:
+                # ... persist event ...
+        """
+        with self._lock:
+            if not self._current_max:
+                self._compute_current_max(txn)
+
+            self._current_max += 1
+            next_id = self._current_max
+
+            self._unfinished_ids.append(next_id)
+
+        @contextlib.contextmanager
+        def manager():
+            yield next_id
+            with self._lock:
+                self._unfinished_ids.remove(next_id)
+
+        return manager()
+
+    def get_max_token(self, store):
+        """Returns the maximum stream id such that all stream ids less than or
+        equal to it have been successfully persisted.
+        """
+        with self._lock:
+            if self._unfinished_ids:
+                return self._unfinished_ids[0] - 1
+
+            if not self._current_max:
+                return store.runInteraction(
+                    "_compute_current_max",
+                    self._compute_current_max,
+                )
+
+            return self._current_max
+
+    def _compute_current_max(self, txn):
+        txn.execute("SELECT MAX(stream_ordering) FROM events")
+        val, = txn.fetchone()
+
+        self._current_max = int(val) if val else 1
+
+        return self._current_max