summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rwxr-xr-xsynapse/app/homeserver.py45
-rw-r--r--synapse/config/database.py9
-rw-r--r--synapse/storage/__init__.py115
-rw-r--r--synapse/storage/_base.py29
-rw-r--r--synapse/storage/appservice.py4
-rw-r--r--synapse/storage/directory.py4
-rw-r--r--synapse/storage/event_federation.py25
-rw-r--r--synapse/storage/events.py36
-rw-r--r--synapse/storage/push_rule.py4
-rw-r--r--synapse/storage/registration.py17
-rw-r--r--synapse/storage/room.py4
-rw-r--r--synapse/storage/roommember.py2
-rw-r--r--synapse/storage/schema/delta/12/v12.sql58
-rw-r--r--synapse/storage/schema/delta/13/v13.sql25
-rw-r--r--synapse/storage/schema/delta/14/v14.sql8
-rw-r--r--synapse/storage/schema/full_schemas/11/event_edges.sql68
-rw-r--r--synapse/storage/schema/full_schemas/11/event_signatures.sql54
-rw-r--r--synapse/storage/schema/full_schemas/11/im.sql121
-rw-r--r--synapse/storage/schema/full_schemas/11/keys.sql24
-rw-r--r--synapse/storage/schema/full_schemas/11/media_repository.sql58
-rw-r--r--synapse/storage/schema/full_schemas/11/presence.sql28
-rw-r--r--synapse/storage/schema/full_schemas/11/profiles.sql10
-rw-r--r--synapse/storage/schema/full_schemas/11/redactions.sql8
-rw-r--r--synapse/storage/schema/full_schemas/11/room_aliases.sql16
-rw-r--r--synapse/storage/schema/full_schemas/11/state.sql41
-rw-r--r--synapse/storage/schema/full_schemas/11/transactions.sql46
-rw-r--r--synapse/storage/schema/full_schemas/11/users.sql43
-rw-r--r--synapse/storage/schema/schema_version.sql15
-rw-r--r--synapse/storage/state.py11
-rw-r--r--synapse/storage/stream.py4
-rw-r--r--synapse/storage/transactions.py8
31 files changed, 504 insertions, 436 deletions
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 500cae05fb..394e93e6c2 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -60,6 +60,8 @@ import re
 import resource
 import subprocess
 import sqlite3
+import yaml
+
 
 logger = logging.getLogger(__name__)
 
@@ -108,14 +110,14 @@ class SynapseHomeServer(HomeServer):
             return None
 
     def build_db_pool(self):
-        return adbapi.ConnectionPool(
-            "sqlite3", self.get_db_name(),
-            check_same_thread=False,
-            cp_min=1,
-            cp_max=1,
-            cp_openfun=prepare_database,  # Prepare the database for each conn
-                                          # so that :memory: sqlite works
-        )
+        name = self.db_config.pop("name", None)
+        if name == "MySQLdb":
+            return adbapi.ConnectionPool(
+                name,
+                **self.db_config
+            )
+
+        raise RuntimeError("Unsupported database type")
 
     def create_resource_tree(self, redirect_root_to_web_client):
         """Create the resource tree for this Home Server.
@@ -358,11 +360,29 @@ def setup(config_options):
 
     tls_context_factory = context_factory.ServerContextFactory(config)
 
+    if config.database_config:
+        with open(config.database_config, 'r') as f:
+            db_config = yaml.safe_load(f)
+
+        name = db_config.get("name", None)
+        if name == "MySQLdb":
+            db_config.update({
+                "sql_mode": "TRADITIONAL",
+                "charset": "utf8",
+                "use_unicode": True,
+            })
+    else:
+        db_config = {
+            "name": "sqlite3",
+            "database": config.database_path,
+        }
+
     hs = SynapseHomeServer(
         config.server_name,
         domain_with_port=domain_with_port,
         upload_dir=os.path.abspath("uploads"),
         db_name=config.database_path,
+        db_config=db_config,
         tls_context_factory=tls_context_factory,
         config=config,
         content_addr=config.content_addr,
@@ -378,9 +398,12 @@ def setup(config_options):
     logger.info("Preparing database: %s...", db_name)
 
     try:
-        with sqlite3.connect(db_name) as db_conn:
-            prepare_sqlite3_database(db_conn)
-            prepare_database(db_conn)
+        # with sqlite3.connect(db_name) as db_conn:
+        #     prepare_sqlite3_database(db_conn)
+        #     prepare_database(db_conn)
+        import MySQLdb
+        db_conn = MySQLdb.connect(**db_config)
+        prepare_database(db_conn)
     except UpgradeDatabaseException:
         sys.stderr.write(
             "\nFailed to upgrade database.\n"
diff --git a/synapse/config/database.py b/synapse/config/database.py
index 87efe54645..8dc9873f8c 100644
--- a/synapse/config/database.py
+++ b/synapse/config/database.py
@@ -26,6 +26,11 @@ class DatabaseConfig(Config):
             self.database_path = self.abspath(args.database_path)
         self.event_cache_size = self.parse_size(args.event_cache_size)
 
+        if args.database_config:
+            self.database_config = self.abspath(args.database_config)
+        else:
+            self.database_config = None
+
     @classmethod
     def add_arguments(cls, parser):
         super(DatabaseConfig, cls).add_arguments(parser)
@@ -38,6 +43,10 @@ class DatabaseConfig(Config):
             "--event-cache-size", default="100K",
             help="Number of events to cache in memory."
         )
+        db_group.add_argument(
+            "--database-config", default=None,
+            help="Location of the database configuration file."
+        )
 
     @classmethod
     def generate_config(cls, args, config_dir_path):
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 76e7bdfaed..4877f45dce 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -13,6 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from twisted.internet import defer
+
 from .appservice import ApplicationServiceStore
 from .directory import DirectoryStore
 from .events import EventsStore
@@ -41,6 +43,8 @@ import logging
 import os
 import re
 
+import threading
+
 
 logger = logging.getLogger(__name__)
 
@@ -73,6 +77,9 @@ class DataStore(RoomMemberStore, RoomStore,
         self.min_token_deferred = self._get_min_token()
         self.min_token = None
 
+        self._next_stream_id_lock = threading.Lock()
+        self._next_stream_id = int(hs.get_clock().time_msec()) * 1000
+
     def insert_client_ip(self, user, access_token, device_id, ip, user_agent):
         return self._simple_insert(
             "user_ips",
@@ -132,7 +139,7 @@ def prepare_database(db_conn):
         else:
             _setup_new_database(cur)
 
-        cur.execute("PRAGMA user_version = %d" % (SCHEMA_VERSION,))
+        # cur.execute("PRAGMA user_version = %d" % (SCHEMA_VERSION,))
 
         cur.close()
         db_conn.commit()
@@ -195,19 +202,17 @@ def _setup_new_database(cur):
 
     directory_entries = os.listdir(sql_dir)
 
-    sql_script = "BEGIN TRANSACTION;\n"
     for filename in fnmatch.filter(directory_entries, "*.sql"):
         sql_loc = os.path.join(sql_dir, filename)
         logger.debug("Applying schema %s", sql_loc)
-        sql_script += read_schema(sql_loc)
-        sql_script += "\n"
-    sql_script += "COMMIT TRANSACTION;"
-    cur.executescript(sql_script)
+        executescript(cur, sql_loc)
 
     cur.execute(
-        "INSERT OR REPLACE INTO schema_version (version, upgraded)"
-        " VALUES (?,?)",
-        (max_current_ver, False)
+        _convert_param_style(
+            "REPLACE INTO schema_version (version, upgraded)"
+            " VALUES (?,?)"
+        ),
+        (max_current_ver, False,)
     )
 
     _upgrade_existing_database(
@@ -275,6 +280,8 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files,
     if not upgraded:
         start_ver += 1
 
+    logger.debug("applied_delta_files: %s", applied_delta_files)
+
     for v in range(start_ver, SCHEMA_VERSION + 1):
         logger.debug("Upgrading schema to v%d", v)
 
@@ -291,6 +298,7 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files,
         directory_entries.sort()
         for file_name in directory_entries:
             relative_path = os.path.join(str(v), file_name)
+            logger.debug("Found file: %s", relative_path)
             if relative_path in applied_delta_files:
                 continue
 
@@ -312,9 +320,8 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files,
                 module.run_upgrade(cur)
             elif ext == ".sql":
                 # A plain old .sql file, just read and execute it
-                delta_schema = read_schema(absolute_path)
                 logger.debug("Applying schema %s", relative_path)
-                cur.executescript(delta_schema)
+                executescript(cur, absolute_path)
             else:
                 # Not a valid delta file.
                 logger.warn(
@@ -326,24 +333,85 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files,
 
             # Mark as done.
             cur.execute(
-                "INSERT INTO applied_schema_deltas (version, file)"
-                " VALUES (?,?)",
+                _convert_param_style(
+                    "INSERT INTO applied_schema_deltas (version, file)"
+                    " VALUES (?,?)"
+                ),
                 (v, relative_path)
             )
 
             cur.execute(
-                "INSERT OR REPLACE INTO schema_version (version, upgraded)"
-                " VALUES (?,?)",
+                _convert_param_style(
+                    "REPLACE INTO schema_version (version, upgraded)"
+                    " VALUES (?,?)"
+                ),
                 (v, True)
             )
 
 
+def _convert_param_style(sql):
+    return sql.replace("?", "%s")
+
+
+def get_statements(f):
+    statement_buffer = ""
+    in_comment = False  # If we're in a /* ... */ style comment
+
+    for line in f:
+        line = line.strip()
+
+        if in_comment:
+            # Check if this line contains an end to the comment
+            comments = line.split("*/", 1)
+            if len(comments) == 1:
+                continue
+            line = comments[1]
+            in_comment = False
+
+        # Remove inline block comments
+        line = re.sub(r"/\*.*\*/", " ", line)
+
+        # Does this line start a comment?
+        comments = line.split("/*", 1)
+        if len(comments) > 1:
+            line = comments[0]
+            in_comment = True
+
+        # Deal with line comments
+        line = line.split("--", 1)[0]
+        line = line.split("//", 1)[0]
+
+        # Find *all* semicolons. We need to treat first and last entry
+        # specially.
+        statements = line.split(";")
+
+        # We must prepend statement_buffer to the first statement
+        first_statement = "%s %s" % (
+            statement_buffer.strip(),
+            statements[0].strip()
+        )
+        statements[0] = first_statement
+
+        # Every entry, except the last, is a full statement
+        for statement in statements[:-1]:
+            yield statement.strip()
+
+        # The last entry did *not* end in a semicolon, so we store it for the
+        # next semicolon we find
+        statement_buffer = statements[-1].strip()
+
+
+def executescript(txn, schema_path):
+    with open(schema_path, 'r') as f:
+        for statement in get_statements(f):
+            txn.execute(statement)
+
+
 def _get_or_create_schema_state(txn):
     schema_path = os.path.join(
         dir_path, "schema", "schema_version.sql",
     )
-    create_schema = read_schema(schema_path)
-    txn.executescript(create_schema)
+    executescript(txn, schema_path)
 
     txn.execute("SELECT version, upgraded FROM schema_version")
     row = txn.fetchone()
@@ -352,10 +420,13 @@ def _get_or_create_schema_state(txn):
 
     if current_version:
         txn.execute(
-            "SELECT file FROM applied_schema_deltas WHERE version >= ?",
+            _convert_param_style(
+                "SELECT file FROM applied_schema_deltas WHERE version >= ?"
+            ),
             (current_version,)
         )
-        return current_version, txn.fetchall(), upgraded
+        applied_deltas = [d for d, in txn.fetchall()]
+        return current_version, applied_deltas, upgraded
 
     return None
 
@@ -387,7 +458,9 @@ def prepare_sqlite3_database(db_conn):
 
             if row and row[0]:
                 db_conn.execute(
-                    "INSERT OR REPLACE INTO schema_version (version, upgraded)"
-                    " VALUES (?,?)",
+                    _convert_param_style(
+                        "REPLACE INTO schema_version (version, upgraded)"
+                        " VALUES (?,?)"
+                    ),
                     (row[0], False)
                 )
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index e53630a689..d038c55092 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -118,6 +118,10 @@ def cached(max_entries=1000, num_args=1):
     return wrap
 
 
+def _convert_param_style(sql):
+    return sql.replace("?", "%s")
+
+
 class LoggingTransaction(object):
     """An object that almost-transparently proxies for the 'txn' object
     passed to the constructor. Adds logging and metrics to the .execute()
@@ -138,6 +142,8 @@ class LoggingTransaction(object):
         # TODO(paul): Maybe use 'info' and 'debug' for values?
         sql_logger.debug("[SQL] {%s} %s", self.name, sql)
 
+        sql = _convert_param_style(sql)
+
         try:
             if args and args[0]:
                 values = args[0]
@@ -321,11 +327,11 @@ class SQLBaseStore(object):
             The result of decoder(results)
         """
         def interaction(txn):
-            cursor = txn.execute(query, args)
+            txn.execute(query, args)
             if decoder:
-                return decoder(cursor)
+                return decoder(txn)
             else:
-                return cursor.fetchall()
+                return txn.fetchall()
 
         return self.runInteraction(desc, interaction)
 
@@ -354,8 +360,7 @@ class SQLBaseStore(object):
     def _simple_insert_txn(self, txn, table, values, or_replace=False,
                            or_ignore=False):
         sql = "%s INTO %s (%s) VALUES(%s)" % (
-            ("INSERT OR REPLACE" if or_replace else
-             "INSERT OR IGNORE" if or_ignore else "INSERT"),
+            ("REPLACE" if or_replace else "INSERT"),
             table,
             ", ".join(k for k in values),
             ", ".join("?" for k in values)
@@ -468,8 +473,7 @@ class SQLBaseStore(object):
 
     def _simple_select_onecol_txn(self, txn, table, keyvalues, retcol):
         sql = (
-            "SELECT %(retcol)s FROM %(table)s WHERE %(where)s "
-            "ORDER BY rowid asc"
+            "SELECT %(retcol)s FROM %(table)s WHERE %(where)s"
         ) % {
             "retcol": retcol,
             "table": table,
@@ -527,14 +531,14 @@ class SQLBaseStore(object):
             retcols : list of strings giving the names of the columns to return
         """
         if keyvalues:
-            sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % (
+            sql = "SELECT %s FROM %s WHERE %s" % (
                 ", ".join(retcols),
                 table,
                 " AND ".join("%s = ?" % (k, ) for k in keyvalues)
             )
             txn.execute(sql, keyvalues.values())
         else:
-            sql = "SELECT %s FROM %s ORDER BY rowid asc" % (
+            sql = "SELECT %s FROM %s" % (
                 ", ".join(retcols),
                 table
             )
@@ -627,6 +631,11 @@ class SQLBaseStore(object):
                     updatevalues=updatevalues,
                 )
 
+                # if txn.rowcount == 0:
+                #     raise StoreError(404, "No row found")
+                if txn.rowcount > 1:
+                    raise StoreError(500, "More than one row matched")
+
             return ret
         return self.runInteraction(desc, func)
 
@@ -851,7 +860,7 @@ class Table(object):
 
     _select_where_clause = "SELECT %s FROM %s WHERE %s"
     _select_clause = "SELECT %s FROM %s"
-    _insert_clause = "INSERT OR REPLACE INTO %s (%s) VALUES (%s)"
+    _insert_clause = "REPLACE INTO %s (%s) VALUES (%s)"
 
     @classmethod
     def select_statement(cls, where_clause=None):
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index 850676ce6c..375265d666 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -147,11 +147,11 @@ class ApplicationServiceStore(SQLBaseStore):
         return True
 
     def _get_as_id_txn(self, txn, token):
-        cursor = txn.execute(
+        txn.execute(
             "SELECT id FROM application_services WHERE token=?",
             (token,)
         )
-        res = cursor.fetchone()
+        res = txn.fetchone()
         if res:
             return res[0]
 
diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py
index 6672752fe0..e31e10186a 100644
--- a/synapse/storage/directory.py
+++ b/synapse/storage/directory.py
@@ -115,12 +115,12 @@ class DirectoryStore(SQLBaseStore):
         )
 
     def _delete_room_alias_txn(self, txn, room_alias):
-        cursor = txn.execute(
+        txn.execute(
             "SELECT room_id FROM room_aliases WHERE room_alias = ?",
             (room_alias.to_string(),)
         )
 
-        res = cursor.fetchone()
+        res = txn.fetchone()
         if res:
             room_id = res[0]
         else:
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 032334bfd6..79ad5ddc9c 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -242,7 +242,6 @@ class EventFederationStore(SQLBaseStore):
                     "room_id": room_id,
                     "min_depth": depth,
                 },
-                or_replace=True,
             )
 
     def _handle_prev_events(self, txn, outlier, event_id, prev_events,
@@ -262,7 +261,6 @@ class EventFederationStore(SQLBaseStore):
                     "room_id": room_id,
                     "is_state": 0,
                 },
-                or_ignore=True,
             )
 
         # Update the extremities table if this is not an outlier.
@@ -281,19 +279,19 @@ class EventFederationStore(SQLBaseStore):
             # We only insert as a forward extremity the new event if there are
             # no other events that reference it as a prev event
             query = (
-                "INSERT OR IGNORE INTO %(table)s (event_id, room_id) "
-                "SELECT ?, ? WHERE NOT EXISTS ("
-                "SELECT 1 FROM %(event_edges)s WHERE "
-                "prev_event_id = ? "
-                ")"
-            ) % {
-                "table": "event_forward_extremities",
-                "event_edges": "event_edges",
-            }
+                "SELECT 1 FROM event_edges WHERE prev_event_id = ?"
+            )
 
-            logger.debug("query: %s", query)
+            txn.execute(query, (event_id,))
+
+            if not txn.fetchone():
+                query = (
+                    "INSERT INTO event_forward_extremities"
+                    " (event_id, room_id)"
+                    " VALUES (?, ?)"
+                )
 
-            txn.execute(query, (event_id, room_id, event_id))
+                txn.execute(query, (event_id, room_id))
 
             # Insert all the prev_events as a backwards thing, they'll get
             # deleted in a second if they're incorrect anyway.
@@ -306,7 +304,6 @@ class EventFederationStore(SQLBaseStore):
                         "event_id": e_id,
                         "room_id": room_id,
                     },
-                    or_ignore=True,
                 )
 
             # Also delete from the backwards extremities table all ones that
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index a86230d92c..542559c59f 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -114,7 +114,6 @@ class EventsStore(SQLBaseStore):
                         "type": s.type,
                         "state_key": s.state_key,
                     },
-                    or_replace=True,
                 )
 
         if event.is_state() and is_new_state:
@@ -128,7 +127,6 @@ class EventsStore(SQLBaseStore):
                         "type": event.type,
                         "state_key": event.state_key,
                     },
-                    or_replace=True,
                 )
 
                 for prev_state_id, _ in event.prev_state:
@@ -200,8 +198,6 @@ class EventsStore(SQLBaseStore):
 
         if event.type == EventTypes.Member:
             self._store_room_member_txn(txn, event)
-        elif event.type == EventTypes.Feedback:
-            self._store_feedback_txn(txn, event)
         elif event.type == EventTypes.Name:
             self._store_room_name_txn(txn, event)
         elif event.type == EventTypes.Topic:
@@ -227,7 +223,6 @@ class EventsStore(SQLBaseStore):
                 "internal_metadata": metadata_json.decode("UTF-8"),
                 "json": encode_canonical_json(event_dict).decode("UTF-8"),
             },
-            or_replace=True,
         )
 
         content = encode_canonical_json(
@@ -245,8 +240,8 @@ class EventsStore(SQLBaseStore):
             "depth": event.depth,
         }
 
-        if stream_ordering is not None:
-            vals["stream_ordering"] = stream_ordering
+        if stream_ordering is None:
+            stream_ordering = self.get_next_stream_id()
 
         unrec = {
             k: v
@@ -264,21 +259,18 @@ class EventsStore(SQLBaseStore):
             unrec
         ).decode("UTF-8")
 
-        try:
-            self._simple_insert_txn(
-                txn,
-                "events",
-                vals,
-                or_replace=(not outlier),
-                or_ignore=bool(outlier),
-            )
-        except:
-            logger.warn(
-                "Failed to persist, probably duplicate: %s",
-                event.event_id,
-                exc_info=True,
-            )
-            raise _RollbackButIsFineException("_persist_event")
+        sql = (
+            "INSERT INTO events"
+            " (stream_ordering, topological_ordering, event_id, type,"
+            " room_id, content, processed, outlier, depth)"
+            " VALUES (%s,?,?,?,?,?,?,?,?)"
+        ) % (stream_ordering,)
+
+        txn.execute(
+            sql,
+            (event.depth, event.event_id, event.type, event.room_id,
+             content, True, outlier, event.depth)
+        )
 
         if context.rejected:
             self._store_rejections_txn(txn, event.event_id, context.rejected)
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index c47bdc2861..ee7718d5ed 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -154,7 +154,7 @@ class PushRuleStore(SQLBaseStore):
             txn.execute(sql, (user_name, priority_class, new_rule_priority))
 
         # now insert the new rule
-        sql = "INSERT OR REPLACE INTO "+PushRuleTable.table_name+" ("
+        sql = "INSERT INTO "+PushRuleTable.table_name+" ("
         sql += ",".join(new_rule.keys())+") VALUES ("
         sql += ", ".join(["?" for _ in new_rule.keys()])+")"
 
@@ -183,7 +183,7 @@ class PushRuleStore(SQLBaseStore):
         new_rule['priority_class'] = priority_class
         new_rule['priority'] = new_prio
 
-        sql = "INSERT OR REPLACE INTO "+PushRuleTable.table_name+" ("
+        sql = "INSERT INTO "+PushRuleTable.table_name+" ("
         sql += ",".join(new_rule.keys())+") VALUES ("
         sql += ", ".join(["?" for _ in new_rule.keys()])+")"
 
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index f24154f146..fe26d6d62f 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -39,17 +39,10 @@ class RegistrationStore(SQLBaseStore):
         Raises:
             StoreError if there was a problem adding this.
         """
-        row = yield self._simple_select_one(
-            "users", {"name": user_id}, ["id"],
-            desc="add_access_token_to_user",
-        )
-        if not row:
-            raise StoreError(400, "Bad user ID supplied.")
-        row_id = row["id"]
         yield self._simple_insert(
             "access_tokens",
             {
-                "user_id": row_id,
+                "user_id": user_id,
                 "token": token
             },
             desc="add_access_token_to_user",
@@ -86,7 +79,7 @@ class RegistrationStore(SQLBaseStore):
         # it's possible for this to get a conflict, but only for a single user
         # since tokens are namespaced based on their user ID
         txn.execute("INSERT INTO access_tokens(user_id, token) " +
-                    "VALUES (?,?)", [txn.lastrowid, token])
+                    "VALUES (?,?)", [user_id, token])
 
     def get_user_by_id(self, user_id):
         query = ("SELECT users.name, users.password_hash FROM users"
@@ -134,12 +127,12 @@ class RegistrationStore(SQLBaseStore):
             "SELECT users.name, users.admin,"
             " access_tokens.device_id, access_tokens.id as token_id"
             " FROM users"
-            " INNER JOIN access_tokens on users.id = access_tokens.user_id"
+            " INNER JOIN access_tokens on users.name = access_tokens.user_id"
             " WHERE token = ?"
         )
 
-        cursor = txn.execute(sql, (token,))
-        rows = self.cursor_to_dict(cursor)
+        txn.execute(sql, (token,))
+        rows = self.cursor_to_dict(txn)
         if rows:
             return rows[0]
 
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index be3e28c2ea..8641033327 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -117,9 +117,9 @@ class RoomStore(SQLBaseStore):
                 "name": name_subquery,
             }
 
-            c = txn.execute(sql, (is_public,))
+            txn.execute(sql, (is_public,))
 
-            return c.fetchall()
+            return txn.fetchall()
 
         rows = yield self.runInteraction(
             "get_rooms", f
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 52c37c76f5..17ee4bb9ec 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -68,7 +68,7 @@ class RoomMemberStore(SQLBaseStore):
         # Update room hosts table
         if event.membership == Membership.JOIN:
             sql = (
-                "INSERT OR IGNORE INTO room_hosts (room_id, host) "
+                "REPLACE INTO room_hosts (room_id, host) "
                 "VALUES (?, ?)"
             )
             txn.execute(sql, (event.room_id, domain))
diff --git a/synapse/storage/schema/delta/12/v12.sql b/synapse/storage/schema/delta/12/v12.sql
index b87ef1fe79..b526109e6e 100644
--- a/synapse/storage/schema/delta/12/v12.sql
+++ b/synapse/storage/schema/delta/12/v12.sql
@@ -14,54 +14,50 @@
  */
 
 CREATE TABLE IF NOT EXISTS rejections(
-    event_id TEXT NOT NULL,
-    reason TEXT NOT NULL,
-    last_check TEXT NOT NULL,
-    CONSTRAINT ev_id UNIQUE (event_id) ON CONFLICT REPLACE
-);
+    event_id VARCHAR(255) NOT NULL,
+    reason VARCHAR(255) NOT NULL,
+    last_check VARCHAR(255) NOT NULL,
+    UNIQUE (event_id)
+) ENGINE = INNODB;
 
 -- Push notification endpoints that users have configured
 CREATE TABLE IF NOT EXISTS pushers (
-  id INTEGER PRIMARY KEY AUTOINCREMENT,
-  user_name TEXT NOT NULL,
-  profile_tag varchar(32) NOT NULL,
-  kind varchar(8) NOT NULL,
-  app_id varchar(64) NOT NULL,
-  app_display_name varchar(64) NOT NULL,
-  device_display_name varchar(128) NOT NULL,
-  pushkey blob NOT NULL,
+  id BIGINT PRIMARY KEY,
+  user_name VARCHAR(255) NOT NULL,
+  profile_tag VARCHAR(32) NOT NULL,
+  kind VARCHAR(8) NOT NULL,
+  app_id VARCHAR(64) NOT NULL,
+  app_display_name VARCHAR(64) NOT NULL,
+  device_display_name VARCHAR(128) NOT NULL,
+  pushkey VARBINARY(512) NOT NULL,
   ts BIGINT NOT NULL,
-  lang varchar(8),
-  data blob,
+  lang VARCHAR(8),
+  data BLOB,
   last_token TEXT,
   last_success BIGINT,
   failing_since BIGINT,
-  FOREIGN KEY(user_name) REFERENCES users(name),
   UNIQUE (app_id, pushkey)
-);
+) ENGINE = INNODB;
 
 CREATE TABLE IF NOT EXISTS push_rules (
-  id INTEGER PRIMARY KEY AUTOINCREMENT,
-  user_name TEXT NOT NULL,
-  rule_id TEXT NOT NULL,
+  id BIGINT PRIMARY KEY,
+  user_name VARCHAR(255) NOT NULL,
+  rule_id VARCHAR(255) NOT NULL,
   priority_class TINYINT NOT NULL,
   priority INTEGER NOT NULL DEFAULT 0,
-  conditions TEXT NOT NULL,
-  actions TEXT NOT NULL,
+  conditions VARCHAR(255) NOT NULL,
+  actions VARCHAR(255) NOT NULL,
   UNIQUE(user_name, rule_id)
-);
+) ENGINE = INNODB;
 
 CREATE INDEX IF NOT EXISTS push_rules_user_name on push_rules (user_name);
 
 CREATE TABLE IF NOT EXISTS user_filters(
-  user_id TEXT,
-  filter_id INTEGER,
-  filter_json TEXT,
-  FOREIGN KEY(user_id) REFERENCES users(id)
-);
+  user_id VARCHAR(255),
+  filter_id BIGINT,
+  filter_json BLOB
+) ENGINE = INNODB;
 
 CREATE INDEX IF NOT EXISTS user_filters_by_user_id_filter_id ON user_filters(
-  user_id, filter_id
+    user_id, filter_id
 );
-
-PRAGMA user_version = 12;
diff --git a/synapse/storage/schema/delta/13/v13.sql b/synapse/storage/schema/delta/13/v13.sql
index e491ad5aec..f0a5daf445 100644
--- a/synapse/storage/schema/delta/13/v13.sql
+++ b/synapse/storage/schema/delta/13/v13.sql
@@ -14,21 +14,18 @@
  */
 
 CREATE TABLE IF NOT EXISTS application_services(
-    id INTEGER PRIMARY KEY AUTOINCREMENT,
-    url TEXT,
-    token TEXT,
-    hs_token TEXT,
-    sender TEXT,
-    UNIQUE(token) ON CONFLICT ROLLBACK
-);
+    id BIGINT PRIMARY KEY,
+    url VARCHAR(255),
+    token VARCHAR(255),
+    hs_token VARCHAR(255),
+    sender VARCHAR(255),
+    UNIQUE(token)
+) ENGINE = INNODB;
 
 CREATE TABLE IF NOT EXISTS application_services_regex(
-    id INTEGER PRIMARY KEY AUTOINCREMENT,
-    as_id INTEGER NOT NULL,
+    id BIGINT PRIMARY KEY,
+    as_id BIGINT NOT NULL,
     namespace INTEGER,  /* enum[room_id|room_alias|user_id] */
-    regex TEXT,
+    regex VARCHAR(255),
     FOREIGN KEY(as_id) REFERENCES application_services(id)
-);
-
-
-
+) ENGINE = INNODB;
diff --git a/synapse/storage/schema/delta/14/v14.sql b/synapse/storage/schema/delta/14/v14.sql
index 0212726448..a1260c5c1f 100644
--- a/synapse/storage/schema/delta/14/v14.sql
+++ b/synapse/storage/schema/delta/14/v14.sql
@@ -1,9 +1,9 @@
 CREATE TABLE IF NOT EXISTS push_rules_enable (
-  id INTEGER PRIMARY KEY AUTOINCREMENT,
-  user_name TEXT NOT NULL,
-  rule_id TEXT NOT NULL,
+  id BIGINT PRIMARY KEY,
+  user_name VARCHAR(255) NOT NULL,
+  rule_id VARCHAR(255) NOT NULL,
   enabled TINYINT,
   UNIQUE(user_name, rule_id)
-);
+) ENGINE = INNODB;
 
 CREATE INDEX IF NOT EXISTS push_rules_enable_user_name on push_rules_enable (user_name);
diff --git a/synapse/storage/schema/full_schemas/11/event_edges.sql b/synapse/storage/schema/full_schemas/11/event_edges.sql
index 1e766d6db2..0f53488e92 100644
--- a/synapse/storage/schema/full_schemas/11/event_edges.sql
+++ b/synapse/storage/schema/full_schemas/11/event_edges.sql
@@ -14,63 +14,63 @@
  */
 
 CREATE TABLE IF NOT EXISTS event_forward_extremities(
-    event_id TEXT NOT NULL,
-    room_id TEXT NOT NULL,
-    CONSTRAINT uniqueness UNIQUE (event_id, room_id) ON CONFLICT REPLACE
-);
+    event_id VARCHAR(255) NOT NULL,
+    room_id VARCHAR(255) NOT NULL,
+    UNIQUE (event_id, room_id)
+) ENGINE = INNODB;
 
 CREATE INDEX IF NOT EXISTS ev_extrem_room ON event_forward_extremities(room_id);
 CREATE INDEX IF NOT EXISTS ev_extrem_id ON event_forward_extremities(event_id);
 
 
 CREATE TABLE IF NOT EXISTS event_backward_extremities(
-    event_id TEXT NOT NULL,
-    room_id TEXT NOT NULL,
-    CONSTRAINT uniqueness UNIQUE (event_id, room_id) ON CONFLICT REPLACE
-);
+    event_id VARCHAR(255) NOT NULL,
+    room_id VARCHAR(255) NOT NULL,
+    UNIQUE (event_id, room_id)
+) ENGINE = INNODB;
 
 CREATE INDEX IF NOT EXISTS ev_b_extrem_room ON event_backward_extremities(room_id);
 CREATE INDEX IF NOT EXISTS ev_b_extrem_id ON event_backward_extremities(event_id);
 
 
 CREATE TABLE IF NOT EXISTS event_edges(
-    event_id TEXT NOT NULL,
-    prev_event_id TEXT NOT NULL,
-    room_id TEXT NOT NULL,
-    is_state INTEGER NOT NULL,
-    CONSTRAINT uniqueness UNIQUE (event_id, prev_event_id, room_id, is_state)
-);
+    event_id VARCHAR(255) NOT NULL,
+    prev_event_id VARCHAR(255) NOT NULL,
+    room_id VARCHAR(255) NOT NULL,
+    is_state BOOL NOT NULL,
+    UNIQUE (event_id, prev_event_id, room_id, is_state)
+) ENGINE = INNODB;
 
 CREATE INDEX IF NOT EXISTS ev_edges_id ON event_edges(event_id);
 CREATE INDEX IF NOT EXISTS ev_edges_prev_id ON event_edges(prev_event_id);
 
 
 CREATE TABLE IF NOT EXISTS room_depth(
-    room_id TEXT NOT NULL,
+    room_id VARCHAR(255) NOT NULL,
     min_depth INTEGER NOT NULL,
-    CONSTRAINT uniqueness UNIQUE (room_id)
-);
+    UNIQUE (room_id)
+) ENGINE = INNODB;
 
 CREATE INDEX IF NOT EXISTS room_depth_room ON room_depth(room_id);
 
 
 create TABLE IF NOT EXISTS event_destinations(
-    event_id TEXT NOT NULL,
-    destination TEXT NOT NULL,
-    delivered_ts INTEGER DEFAULT 0, -- or 0 if not delivered
-    CONSTRAINT uniqueness UNIQUE (event_id, destination) ON CONFLICT REPLACE
-);
+    event_id VARCHAR(255) NOT NULL,
+    destination VARCHAR(255) NOT NULL,
+    delivered_ts BIGINT DEFAULT 0, -- or 0 if not delivered
+    UNIQUE (event_id, destination)
+) ENGINE = INNODB;
 
 CREATE INDEX IF NOT EXISTS event_destinations_id ON event_destinations(event_id);
 
 
 CREATE TABLE IF NOT EXISTS state_forward_extremities(
-    event_id TEXT NOT NULL,
-    room_id TEXT NOT NULL,
-    type TEXT NOT NULL,
-    state_key TEXT NOT NULL,
-    CONSTRAINT uniqueness UNIQUE (event_id, room_id) ON CONFLICT REPLACE
-);
+    event_id VARCHAR(255) NOT NULL,
+    room_id VARCHAR(255) NOT NULL,
+    type VARCHAR(255) NOT NULL,
+    state_key VARCHAR(255) NOT NULL,
+    UNIQUE (event_id, room_id)
+) ENGINE = INNODB;
 
 CREATE INDEX IF NOT EXISTS st_extrem_keys ON state_forward_extremities(
     room_id, type, state_key
@@ -79,11 +79,11 @@ CREATE INDEX IF NOT EXISTS st_extrem_id ON state_forward_extremities(event_id);
 
 
 CREATE TABLE IF NOT EXISTS event_auth(
-    event_id TEXT NOT NULL,
-    auth_id TEXT NOT NULL,
-    room_id TEXT NOT NULL,
-    CONSTRAINT uniqueness UNIQUE (event_id, auth_id, room_id)
-);
+    event_id VARCHAR(255) NOT NULL,
+    auth_id VARCHAR(255) NOT NULL,
+    room_id VARCHAR(255) NOT NULL,
+    UNIQUE (event_id, auth_id, room_id)
+) ENGINE = INNODB;
 
 CREATE INDEX IF NOT EXISTS evauth_edges_id ON event_auth(event_id);
-CREATE INDEX IF NOT EXISTS evauth_edges_auth_id ON event_auth(auth_id);
\ No newline at end of file
+CREATE INDEX IF NOT EXISTS evauth_edges_auth_id ON event_auth(auth_id);
diff --git a/synapse/storage/schema/full_schemas/11/event_signatures.sql b/synapse/storage/schema/full_schemas/11/event_signatures.sql
index c28c39c48a..334d7c8680 100644
--- a/synapse/storage/schema/full_schemas/11/event_signatures.sql
+++ b/synapse/storage/schema/full_schemas/11/event_signatures.sql
@@ -14,52 +14,42 @@
  */
 
 CREATE TABLE IF NOT EXISTS event_content_hashes (
-    event_id TEXT,
-    algorithm TEXT,
+    event_id VARCHAR(255),
+    algorithm VARCHAR(255),
     hash BLOB,
-    CONSTRAINT uniqueness UNIQUE (event_id, algorithm)
-);
+    UNIQUE (event_id, algorithm)
+) ENGINE = INNODB;
 
-CREATE INDEX IF NOT EXISTS event_content_hashes_id ON event_content_hashes(
-    event_id
-);
+CREATE INDEX IF NOT EXISTS event_content_hashes_id ON event_content_hashes(event_id);
 
 
 CREATE TABLE IF NOT EXISTS event_reference_hashes (
-    event_id TEXT,
-    algorithm TEXT,
+    event_id VARCHAR(255),
+    algorithm VARCHAR(255),
     hash BLOB,
-    CONSTRAINT uniqueness UNIQUE (event_id, algorithm)
-);
+    UNIQUE (event_id, algorithm)
+) ENGINE = INNODB;
 
-CREATE INDEX IF NOT EXISTS event_reference_hashes_id ON event_reference_hashes (
-    event_id
-);
+CREATE INDEX IF NOT EXISTS event_reference_hashes_id ON event_reference_hashes(event_id);
 
 
 CREATE TABLE IF NOT EXISTS event_signatures (
-    event_id TEXT,
-    signature_name TEXT,
-    key_id TEXT,
+    event_id VARCHAR(255),
+    signature_name VARCHAR(255),
+    key_id VARCHAR(255),
     signature BLOB,
-    CONSTRAINT uniqueness UNIQUE (event_id, signature_name, key_id)
-);
+    UNIQUE (event_id, signature_name, key_id)
+) ENGINE = INNODB;
 
-CREATE INDEX IF NOT EXISTS event_signatures_id ON event_signatures (
-    event_id
-);
+CREATE INDEX IF NOT EXISTS event_signatures_id ON event_signatures(event_id);
 
 
 CREATE TABLE IF NOT EXISTS event_edge_hashes(
-    event_id TEXT,
-    prev_event_id TEXT,
-    algorithm TEXT,
+    event_id VARCHAR(255),
+    prev_event_id VARCHAR(255),
+    algorithm VARCHAR(255),
     hash BLOB,
-    CONSTRAINT uniqueness UNIQUE (
-        event_id, prev_event_id, algorithm
-    )
-);
+    UNIQUE (event_id, prev_event_id, algorithm)
+) ENGINE = INNODB;
 
-CREATE INDEX IF NOT EXISTS event_edge_hashes_id ON event_edge_hashes(
-    event_id
-);
+CREATE INDEX IF NOT EXISTS event_edge_hashes_id ON event_edge_hashes(event_id);
diff --git a/synapse/storage/schema/full_schemas/11/im.sql b/synapse/storage/schema/full_schemas/11/im.sql
index dd00c1cd2f..9849e969be 100644
--- a/synapse/storage/schema/full_schemas/11/im.sql
+++ b/synapse/storage/schema/full_schemas/11/im.sql
@@ -14,112 +14,111 @@
  */
 
 CREATE TABLE IF NOT EXISTS events(
-    stream_ordering INTEGER PRIMARY KEY AUTOINCREMENT,
-    topological_ordering INTEGER NOT NULL,
-    event_id TEXT NOT NULL,
-    type TEXT NOT NULL,
-    room_id TEXT NOT NULL,
-    content TEXT NOT NULL,
-    unrecognized_keys TEXT,
+    stream_ordering BIGINT PRIMARY KEY,
+    topological_ordering BIGINT NOT NULL,
+    event_id VARCHAR(255) NOT NULL,
+    type VARCHAR(255) NOT NULL,
+    room_id VARCHAR(255) NOT NULL,
+    content BLOB NOT NULL,
+    unrecognized_keys BLOB,
     processed BOOL NOT NULL,
     outlier BOOL NOT NULL,
-    depth INTEGER DEFAULT 0 NOT NULL,
-    CONSTRAINT ev_uniq UNIQUE (event_id)
-);
+    depth BIGINT DEFAULT 0 NOT NULL,
+    UNIQUE (event_id)
+) ENGINE = INNODB;
 
-CREATE INDEX IF NOT EXISTS events_event_id ON events (event_id);
 CREATE INDEX IF NOT EXISTS events_stream_ordering ON events (stream_ordering);
 CREATE INDEX IF NOT EXISTS events_topological_ordering ON events (topological_ordering);
 CREATE INDEX IF NOT EXISTS events_room_id ON events (room_id);
 
 
 CREATE TABLE IF NOT EXISTS event_json(
-    event_id TEXT NOT NULL,
-    room_id TEXT NOT NULL,
-    internal_metadata NOT NULL,
+    event_id VARCHAR(255) NOT NULL,
+    room_id VARCHAR(255) NOT NULL,
+    internal_metadata BLOB NOT NULL,
     json BLOB NOT NULL,
-    CONSTRAINT ev_j_uniq UNIQUE (event_id)
-);
+    UNIQUE (event_id)
+) ENGINE = INNODB;
 
-CREATE INDEX IF NOT EXISTS event_json_id ON event_json(event_id);
 CREATE INDEX IF NOT EXISTS event_json_room_id ON event_json(room_id);
 
 
 CREATE TABLE IF NOT EXISTS state_events(
-    event_id TEXT NOT NULL,
-    room_id TEXT NOT NULL,
-    type TEXT NOT NULL,
-    state_key TEXT NOT NULL,
-    prev_state TEXT
-);
+    event_id VARCHAR(255) NOT NULL,
+    room_id VARCHAR(255) NOT NULL,
+    type VARCHAR(255) NOT NULL,
+    state_key VARCHAR(255) NOT NULL,
+    prev_state VARCHAR(255),
+    UNIQUE (event_id)
+) ENGINE = INNODB;
 
-CREATE UNIQUE INDEX IF NOT EXISTS state_events_event_id ON state_events (event_id);
 CREATE INDEX IF NOT EXISTS state_events_room_id ON state_events (room_id);
 CREATE INDEX IF NOT EXISTS state_events_type ON state_events (type);
 CREATE INDEX IF NOT EXISTS state_events_state_key ON state_events (state_key);
 
 
 CREATE TABLE IF NOT EXISTS current_state_events(
-    event_id TEXT NOT NULL,
-    room_id TEXT NOT NULL,
-    type TEXT NOT NULL,
-    state_key TEXT NOT NULL,
-    CONSTRAINT curr_uniq UNIQUE (room_id, type, state_key) ON CONFLICT REPLACE
-);
+    event_id VARCHAR(255) NOT NULL,
+    room_id VARCHAR(255) NOT NULL,
+    type VARCHAR(255) NOT NULL,
+    state_key VARCHAR(255) NOT NULL,
+    UNIQUE (event_id),
+    UNIQUE (room_id, type, state_key)
+) ENGINE = INNODB;
 
-CREATE INDEX IF NOT EXISTS curr_events_event_id ON current_state_events (event_id);
 CREATE INDEX IF NOT EXISTS current_state_events_room_id ON current_state_events (room_id);
 CREATE INDEX IF NOT EXISTS current_state_events_type ON current_state_events (type);
 CREATE INDEX IF NOT EXISTS current_state_events_state_key ON current_state_events (state_key);
 
 CREATE TABLE IF NOT EXISTS room_memberships(
-    event_id TEXT NOT NULL,
-    user_id TEXT NOT NULL,
-    sender TEXT NOT NULL,
-    room_id TEXT NOT NULL,
-    membership TEXT NOT NULL
-);
+    event_id VARCHAR(255) NOT NULL,
+    user_id VARCHAR(255) NOT NULL,
+    sender VARCHAR(255) NOT NULL,
+    room_id VARCHAR(255) NOT NULL,
+    membership VARCHAR(255) NOT NULL,
+    UNIQUE (event_id)
+) ENGINE = INNODB;
 
-CREATE INDEX IF NOT EXISTS room_memberships_event_id ON room_memberships (event_id);
 CREATE INDEX IF NOT EXISTS room_memberships_room_id ON room_memberships (room_id);
 CREATE INDEX IF NOT EXISTS room_memberships_user_id ON room_memberships (user_id);
 
 CREATE TABLE IF NOT EXISTS feedback(
-    event_id TEXT NOT NULL,
-    feedback_type TEXT,
-    target_event_id TEXT,
-    sender TEXT,
-    room_id TEXT
-);
+    event_id VARCHAR(255) NOT NULL,
+    feedback_type VARCHAR(255),
+    target_event_id VARCHAR(255),
+    sender VARCHAR(255),
+    room_id VARCHAR(255),
+    UNIQUE (event_id)
+) ENGINE = INNODB;
 
 CREATE TABLE IF NOT EXISTS topics(
-    event_id TEXT NOT NULL,
-    room_id TEXT NOT NULL,
-    topic TEXT NOT NULL
-);
+    event_id VARCHAR(255) NOT NULL,
+    room_id VARCHAR(255) NOT NULL,
+    topic VARCHAR(255) NOT NULL,
+    UNIQUE (event_id)
+) ENGINE = INNODB;
 
-CREATE INDEX IF NOT EXISTS topics_event_id ON topics(event_id);
 CREATE INDEX IF NOT EXISTS topics_room_id ON topics(room_id);
 
 CREATE TABLE IF NOT EXISTS room_names(
-    event_id TEXT NOT NULL,
-    room_id TEXT NOT NULL,
-    name TEXT NOT NULL
+    event_id VARCHAR(255) NOT NULL,
+    room_id VARCHAR(255) NOT NULL,
+    name VARCHAR(255) NOT NULL,
+    UNIQUE (event_id)
 );
 
-CREATE INDEX IF NOT EXISTS room_names_event_id ON room_names(event_id);
 CREATE INDEX IF NOT EXISTS room_names_room_id ON room_names(room_id);
 
 CREATE TABLE IF NOT EXISTS rooms(
-    room_id TEXT PRIMARY KEY NOT NULL,
-    is_public INTEGER,
-    creator TEXT
-);
+    room_id VARCHAR(255) PRIMARY KEY NOT NULL,
+    is_public BOOL,
+    creator VARCHAR(255)
+) ENGINE = INNODB;
 
 CREATE TABLE IF NOT EXISTS room_hosts(
-    room_id TEXT NOT NULL,
-    host TEXT NOT NULL,
-    CONSTRAINT room_hosts_uniq UNIQUE (room_id, host) ON CONFLICT IGNORE
-);
+    room_id VARCHAR(255) NOT NULL,
+    host VARCHAR(255) NOT NULL,
+    UNIQUE (room_id, host)
+) ENGINE = INNODB;
 
 CREATE INDEX IF NOT EXISTS room_hosts_room_id ON room_hosts (room_id);
diff --git a/synapse/storage/schema/full_schemas/11/keys.sql b/synapse/storage/schema/full_schemas/11/keys.sql
index a9e0a4fe0d..c0f2ec29bb 100644
--- a/synapse/storage/schema/full_schemas/11/keys.sql
+++ b/synapse/storage/schema/full_schemas/11/keys.sql
@@ -13,19 +13,19 @@
  * limitations under the License.
  */
 CREATE TABLE IF NOT EXISTS server_tls_certificates(
-  server_name TEXT, -- Server name.
-  fingerprint TEXT, -- Certificate fingerprint.
-  from_server TEXT, -- Which key server the certificate was fetched from.
-  ts_added_ms INTEGER, -- When the certifcate was added.
+  server_name VARCHAR(255), -- Server name.
+  fingerprint VARCHAR(255), -- Certificate fingerprint.
+  from_server VARCHAR(255), -- Which key server the certificate was fetched from.
+  ts_added_ms BIGINT, -- When the certifcate was added.
   tls_certificate BLOB, -- DER encoded x509 certificate.
-  CONSTRAINT uniqueness UNIQUE (server_name, fingerprint)
-);
+  UNIQUE (server_name, fingerprint)
+) ENGINE = INNODB;
 
 CREATE TABLE IF NOT EXISTS server_signature_keys(
-  server_name TEXT, -- Server name.
-  key_id TEXT, -- Key version.
-  from_server TEXT, -- Which key server the key was fetched form.
-  ts_added_ms INTEGER, -- When the key was added.
+  server_name VARCHAR(255), -- Server name.
+  key_id VARCHAR(255), -- Key version.
+  from_server VARCHAR(255), -- Which key server the key was fetched form.
+  ts_added_ms BIGINT, -- When the key was added.
   verify_key BLOB, -- NACL verification key.
-  CONSTRAINT uniqueness UNIQUE (server_name, key_id)
-);
+  UNIQUE (server_name, key_id)
+) ENGINE = INNODB;
diff --git a/synapse/storage/schema/full_schemas/11/media_repository.sql b/synapse/storage/schema/full_schemas/11/media_repository.sql
index afdf48cbfb..8bc84dc24d 100644
--- a/synapse/storage/schema/full_schemas/11/media_repository.sql
+++ b/synapse/storage/schema/full_schemas/11/media_repository.sql
@@ -14,55 +14,55 @@
  */
 
 CREATE TABLE IF NOT EXISTS local_media_repository (
-    media_id TEXT, -- The id used to refer to the media.
-    media_type TEXT, -- The MIME-type of the media.
+    media_id VARCHAR(255), -- The id used to refer to the media.
+    media_type VARCHAR(255), -- The MIME-type of the media.
     media_length INTEGER, -- Length of the media in bytes.
-    created_ts INTEGER, -- When the content was uploaded in ms.
-    upload_name TEXT, -- The name the media was uploaded with.
-    user_id TEXT, -- The user who uploaded the file.
-    CONSTRAINT uniqueness UNIQUE (media_id)
-);
+    created_ts BIGINT, -- When the content was uploaded in ms.
+    upload_name VARCHAR(255), -- The name the media was uploaded with.
+    user_id VARCHAR(255), -- The user who uploaded the file.
+    UNIQUE (media_id)
+) ENGINE = INNODB;
 
 CREATE TABLE IF NOT EXISTS local_media_repository_thumbnails (
-    media_id TEXT, -- The id used to refer to the media.
+    media_id VARCHAR(255), -- The id used to refer to the media.
     thumbnail_width INTEGER, -- The width of the thumbnail in pixels.
     thumbnail_height INTEGER, -- The height of the thumbnail in pixels.
-    thumbnail_type TEXT, -- The MIME-type of the thumbnail.
-    thumbnail_method TEXT, -- The method used to make the thumbnail.
+    thumbnail_type VARCHAR(255), -- The MIME-type of the thumbnail.
+    thumbnail_method VARCHAR(255), -- The method used to make the thumbnail.
     thumbnail_length INTEGER, -- The length of the thumbnail in bytes.
-    CONSTRAINT uniqueness UNIQUE (
+    UNIQUE (
         media_id, thumbnail_width, thumbnail_height, thumbnail_type
     )
-);
+) ENGINE = INNODB;
 
 CREATE INDEX IF NOT EXISTS local_media_repository_thumbnails_media_id
     ON local_media_repository_thumbnails (media_id);
 
 CREATE TABLE IF NOT EXISTS remote_media_cache (
-    media_origin TEXT, -- The remote HS the media came from.
-    media_id TEXT, -- The id used to refer to the media on that server.
-    media_type TEXT, -- The MIME-type of the media.
-    created_ts INTEGER, -- When the content was uploaded in ms.
-    upload_name TEXT, -- The name the media was uploaded with.
+    media_origin VARCHAR(255), -- The remote HS the media came from.
+    media_id VARCHAR(255), -- The id used to refer to the media on that server.
+    media_type VARCHAR(255), -- The MIME-type of the media.
+    created_ts BIGINT, -- When the content was uploaded in ms.
+    upload_name VARCHAR(255), -- The name the media was uploaded with.
     media_length INTEGER, -- Length of the media in bytes.
-    filesystem_id TEXT, -- The name used to store the media on disk.
-    CONSTRAINT uniqueness UNIQUE (media_origin, media_id)
-);
+    filesystem_id VARCHAR(255), -- The name used to store the media on disk.
+    UNIQUE (media_origin, media_id)
+) ENGINE = INNODB;
 
 CREATE TABLE IF NOT EXISTS remote_media_cache_thumbnails (
-    media_origin TEXT, -- The remote HS the media came from.
-    media_id TEXT, -- The id used to refer to the media.
+    media_origin VARCHAR(255), -- The remote HS the media came from.
+    media_id VARCHAR(255), -- The id used to refer to the media.
     thumbnail_width INTEGER, -- The width of the thumbnail in pixels.
     thumbnail_height INTEGER, -- The height of the thumbnail in pixels.
-    thumbnail_method TEXT, -- The method used to make the thumbnail
-    thumbnail_type TEXT, -- The MIME-type of the thumbnail.
+    thumbnail_method VARCHAR(255), -- The method used to make the thumbnail
+    thumbnail_type VARCHAR(255), -- The MIME-type of the thumbnail.
     thumbnail_length INTEGER, -- The length of the thumbnail in bytes.
-    filesystem_id TEXT, -- The name used to store the media on disk.
-    CONSTRAINT uniqueness UNIQUE (
+    filesystem_id VARCHAR(255), -- The name used to store the media on disk.
+    UNIQUE (
         media_origin, media_id, thumbnail_width, thumbnail_height,
-        thumbnail_type, thumbnail_type
-    )
-);
+        thumbnail_type
+     )
+) ENGINE = INNODB;
 
 CREATE INDEX IF NOT EXISTS remote_media_cache_thumbnails_media_id
     ON local_media_repository_thumbnails (media_id);
diff --git a/synapse/storage/schema/full_schemas/11/presence.sql b/synapse/storage/schema/full_schemas/11/presence.sql
index f9f8db9697..8031321083 100644
--- a/synapse/storage/schema/full_schemas/11/presence.sql
+++ b/synapse/storage/schema/full_schemas/11/presence.sql
@@ -13,26 +13,26 @@
  * limitations under the License.
  */
 CREATE TABLE IF NOT EXISTS presence(
-  user_id INTEGER NOT NULL,
-  state INTEGER,
-  status_msg TEXT,
-  mtime INTEGER, -- miliseconds since last state change
-  FOREIGN KEY(user_id) REFERENCES users(id)
-);
+  user_id VARCHAR(255) NOT NULL,
+  state VARCHAR(20),
+  status_msg VARCHAR(255),
+  mtime BIGINT, -- miliseconds since last state change
+  UNIQUE(user_id)
+) ENGINE = INNODB;
 
 -- For each of /my/ users which possibly-remote users are allowed to see their
 -- presence state
 CREATE TABLE IF NOT EXISTS presence_allow_inbound(
-  observed_user_id INTEGER NOT NULL,
-  observer_user_id TEXT, -- a UserID,
-  FOREIGN KEY(observed_user_id) REFERENCES users(id)
-);
+  observed_user_id VARCHAR(255) NOT NULL,
+  observer_user_id VARCHAR(255), -- a UserID,
+  UNIQUE(observed_user_id)
+) ENGINE = INNODB;
 
 -- For each of /my/ users (watcher), which possibly-remote users are they
 -- watching?
 CREATE TABLE IF NOT EXISTS presence_list(
-  user_id INTEGER NOT NULL,
-  observed_user_id TEXT, -- a UserID,
+  user_id VARCHAR(255) NOT NULL,
+  observed_user_id VARCHAR(255), -- a UserID,
   accepted BOOLEAN,
-  FOREIGN KEY(user_id) REFERENCES users(id)
-);
+  UNIQUE(user_id)
+) ENGINE = INNODB;
diff --git a/synapse/storage/schema/full_schemas/11/profiles.sql b/synapse/storage/schema/full_schemas/11/profiles.sql
index f06a528b4d..32defe2f79 100644
--- a/synapse/storage/schema/full_schemas/11/profiles.sql
+++ b/synapse/storage/schema/full_schemas/11/profiles.sql
@@ -13,8 +13,8 @@
  * limitations under the License.
  */
 CREATE TABLE IF NOT EXISTS profiles(
-    user_id INTEGER NOT NULL,
-    displayname TEXT,
-    avatar_url TEXT,
-    FOREIGN KEY(user_id) REFERENCES users(id)
-);
+    user_id VARCHAR(255) NOT NULL,
+    displayname VARCHAR(255),
+    avatar_url VARCHAR(255),
+    UNIQUE(user_id)
+) ENGINE = INNODB;
diff --git a/synapse/storage/schema/full_schemas/11/redactions.sql b/synapse/storage/schema/full_schemas/11/redactions.sql
index 5011d95db8..ba93e860f6 100644
--- a/synapse/storage/schema/full_schemas/11/redactions.sql
+++ b/synapse/storage/schema/full_schemas/11/redactions.sql
@@ -13,10 +13,10 @@
  * limitations under the License.
  */
 CREATE TABLE IF NOT EXISTS redactions (
-    event_id TEXT NOT NULL,
-    redacts TEXT NOT NULL,
-    CONSTRAINT ev_uniq UNIQUE (event_id)
-);
+    event_id VARCHAR(255) NOT NULL,
+    redacts VARCHAR(255) NOT NULL,
+    UNIQUE (event_id)
+) ENGINE = INNODB;
 
 CREATE INDEX IF NOT EXISTS redactions_event_id ON redactions (event_id);
 CREATE INDEX IF NOT EXISTS redactions_redacts ON redactions (redacts);
diff --git a/synapse/storage/schema/full_schemas/11/room_aliases.sql b/synapse/storage/schema/full_schemas/11/room_aliases.sql
index 0d2df01603..1e706aac2b 100644
--- a/synapse/storage/schema/full_schemas/11/room_aliases.sql
+++ b/synapse/storage/schema/full_schemas/11/room_aliases.sql
@@ -14,14 +14,12 @@
  */
 
 CREATE TABLE IF NOT EXISTS room_aliases(
-    room_alias TEXT NOT NULL,
-    room_id TEXT NOT NULL
-);
+    room_alias VARCHAR(255) NOT NULL,
+    room_id VARCHAR(255) NOT NULL,
+    UNIQUE (room_alias)
+) ENGINE = INNODB;
 
 CREATE TABLE IF NOT EXISTS room_alias_servers(
-    room_alias TEXT NOT NULL,
-    server TEXT NOT NULL
-);
-
-
-
+    room_alias VARCHAR(255) NOT NULL,
+    server VARCHAR(255) NOT NULL
+) ENGINE = INNODB;
diff --git a/synapse/storage/schema/full_schemas/11/state.sql b/synapse/storage/schema/full_schemas/11/state.sql
index 1fe8f1e430..be9dc2920d 100644
--- a/synapse/storage/schema/full_schemas/11/state.sql
+++ b/synapse/storage/schema/full_schemas/11/state.sql
@@ -14,34 +14,27 @@
  */
 
 CREATE TABLE IF NOT EXISTS state_groups(
-    id INTEGER PRIMARY KEY,
-    room_id TEXT NOT NULL,
-    event_id TEXT NOT NULL
-);
+    id VARCHAR(20) PRIMARY KEY,
+    room_id VARCHAR(255) NOT NULL,
+    event_id VARCHAR(255) NOT NULL
+) ENGINE = INNODB;
 
 CREATE TABLE IF NOT EXISTS state_groups_state(
-    state_group INTEGER NOT NULL,
-    room_id TEXT NOT NULL,
-    type TEXT NOT NULL,
-    state_key TEXT NOT NULL,
-    event_id TEXT NOT NULL
-);
+    state_group VARCHAR(20) NOT NULL,
+    room_id VARCHAR(255) NOT NULL,
+    type VARCHAR(255) NOT NULL,
+    state_key VARCHAR(255) NOT NULL,
+    event_id VARCHAR(255) NOT NULL
+) ENGINE = INNODB;
 
 CREATE TABLE IF NOT EXISTS event_to_state_groups(
-    event_id TEXT NOT NULL,
-    state_group INTEGER NOT NULL,
-    CONSTRAINT event_to_state_groups_uniq UNIQUE (event_id)
-);
+    event_id VARCHAR(255) NOT NULL,
+    state_group VARCHAR(255) NOT NULL,
+    UNIQUE (event_id)
+) ENGINE = INNODB;
 
 CREATE INDEX IF NOT EXISTS state_groups_id ON state_groups(id);
 
-CREATE INDEX IF NOT EXISTS state_groups_state_id ON state_groups_state(
-    state_group
-);
-CREATE INDEX IF NOT EXISTS state_groups_state_tuple ON state_groups_state(
-    room_id, type, state_key
-);
-
-CREATE INDEX IF NOT EXISTS event_to_state_groups_id ON event_to_state_groups(
-    event_id
-);
\ No newline at end of file
+CREATE INDEX IF NOT EXISTS state_groups_state_id ON state_groups_state(state_group);
+CREATE INDEX IF NOT EXISTS state_groups_state_tuple ON state_groups_state(room_id, type, state_key);
+CREATE INDEX IF NOT EXISTS event_to_state_groups_id ON event_to_state_groups(event_id);
\ No newline at end of file
diff --git a/synapse/storage/schema/full_schemas/11/transactions.sql b/synapse/storage/schema/full_schemas/11/transactions.sql
index 2d30f99b06..0570bf95d9 100644
--- a/synapse/storage/schema/full_schemas/11/transactions.sql
+++ b/synapse/storage/schema/full_schemas/11/transactions.sql
@@ -14,34 +14,31 @@
  */
 -- Stores what transaction ids we have received and what our response was
 CREATE TABLE IF NOT EXISTS received_transactions(
-    transaction_id TEXT, 
-    origin TEXT, 
-    ts INTEGER,
+    transaction_id VARCHAR(255),
+    origin VARCHAR(255),
+    ts BIGINT,
     response_code INTEGER,
-    response_json TEXT,
+    response_json BLOB,
     has_been_referenced BOOL default 0, -- Whether thishas been referenced by a prev_tx
-    CONSTRAINT uniquesss UNIQUE (transaction_id, origin) ON CONFLICT REPLACE
-);
+    UNIQUE (transaction_id, origin)
+) ENGINE = INNODB;
 
-CREATE UNIQUE INDEX IF NOT EXISTS transactions_txid ON received_transactions(transaction_id, origin);
 CREATE INDEX IF NOT EXISTS transactions_have_ref ON received_transactions(origin, has_been_referenced);-- WHERE has_been_referenced = 0;
 
 
 -- Stores what transactions we've sent, what their response was (if we got one) and whether we have
 -- since referenced the transaction in another outgoing transaction
 CREATE TABLE IF NOT EXISTS sent_transactions(
-    id INTEGER PRIMARY KEY AUTOINCREMENT, -- This is used to apply insertion ordering
-    transaction_id TEXT,
-    destination TEXT,
+    id BIGINT PRIMARY KEY, -- This is used to apply insertion ordering
+    transaction_id VARCHAR(255),
+    destination VARCHAR(255),
     response_code INTEGER DEFAULT 0,
-    response_json TEXT,
-    ts INTEGER
-);
+    response_json BLOB,
+    ts BIGINT
+) ENGINE = INNODB;
 
 CREATE INDEX IF NOT EXISTS sent_transaction_dest ON sent_transactions(destination);
-CREATE INDEX IF NOT EXISTS sent_transaction_dest_referenced ON sent_transactions(
-    destination
-);
+CREATE INDEX IF NOT EXISTS sent_transaction_dest_referenced ON sent_transactions(destination);
 CREATE INDEX IF NOT EXISTS sent_transaction_txn_id ON sent_transactions(transaction_id);
 -- So that we can do an efficient look up of all transactions that have yet to be successfully
 -- sent.
@@ -51,18 +48,17 @@ CREATE INDEX IF NOT EXISTS sent_transaction_sent ON sent_transactions(response_c
 -- For sent transactions only.
 CREATE TABLE IF NOT EXISTS transaction_id_to_pdu(
     transaction_id INTEGER,
-    destination TEXT,
-    pdu_id TEXT,
-    pdu_origin TEXT
-);
+    destination VARCHAR(255),
+    pdu_id VARCHAR(255),
+    pdu_origin VARCHAR(255),
+    UNIQUE (transaction_id, destination)
+) ENGINE = INNODB;
 
-CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_tx ON transaction_id_to_pdu(transaction_id, destination);
 CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_dest ON transaction_id_to_pdu(destination);
-CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_index ON transaction_id_to_pdu(transaction_id, destination);
 
 -- To track destination health
 CREATE TABLE IF NOT EXISTS destinations(
-    destination TEXT PRIMARY KEY,
-    retry_last_ts INTEGER,
+    destination VARCHAR(255) PRIMARY KEY,
+    retry_last_ts BIGINT,
     retry_interval INTEGER
-);
+) ENGINE = INNODB;
diff --git a/synapse/storage/schema/full_schemas/11/users.sql b/synapse/storage/schema/full_schemas/11/users.sql
index 08ccfdac0a..55bffb22f3 100644
--- a/synapse/storage/schema/full_schemas/11/users.sql
+++ b/synapse/storage/schema/full_schemas/11/users.sql
@@ -13,33 +13,30 @@
  * limitations under the License.
  */
 CREATE TABLE IF NOT EXISTS users(
-    id INTEGER PRIMARY KEY AUTOINCREMENT,
-    name TEXT,
-    password_hash TEXT,
-    creation_ts INTEGER,
+    name VARCHAR(255),
+    password_hash VARBINARY(255),
+    creation_ts BIGINT,
     admin BOOL DEFAULT 0 NOT NULL,
-    UNIQUE(name) ON CONFLICT ROLLBACK
-);
+    UNIQUE(name)
+) ENGINE = INNODB;
 
 CREATE TABLE IF NOT EXISTS access_tokens(
-    id INTEGER PRIMARY KEY AUTOINCREMENT,
-    user_id INTEGER NOT NULL,
-    device_id TEXT,
-    token TEXT NOT NULL,
-    last_used INTEGER,
-    FOREIGN KEY(user_id) REFERENCES users(id),
-    UNIQUE(token) ON CONFLICT ROLLBACK
-);
+    id INTEGER PRIMARY KEY AUTO_INCREMENT,
+    user_id VARCHAR(255) NOT NULL,
+    device_id VARCHAR(255),
+    token VARCHAR(255) NOT NULL,
+    last_used BIGINT,
+    UNIQUE(token)
+) ENGINE = INNODB;
 
 CREATE TABLE IF NOT EXISTS user_ips (
-    user TEXT NOT NULL,
-    access_token TEXT NOT NULL,
-    device_id TEXT,
-    ip TEXT NOT NULL,
-    user_agent TEXT NOT NULL,
-    last_seen INTEGER NOT NULL,
-    CONSTRAINT user_ip UNIQUE (user, access_token, ip, user_agent) ON CONFLICT REPLACE
-);
+    user VARCHAR(255) NOT NULL,
+    access_token VARCHAR(255) NOT NULL,
+    device_id VARCHAR(255),
+    ip VARCHAR(255) NOT NULL,
+    user_agent VARCHAR(255) NOT NULL,
+    last_seen BIGINT NOT NULL,
+    UNIQUE (user, access_token, ip, user_agent)
+) ENGINE = INNODB;
 
 CREATE INDEX IF NOT EXISTS user_ips_user ON user_ips(user);
-
diff --git a/synapse/storage/schema/schema_version.sql b/synapse/storage/schema/schema_version.sql
index 0431e2d051..28762861e5 100644
--- a/synapse/storage/schema/schema_version.sql
+++ b/synapse/storage/schema/schema_version.sql
@@ -14,17 +14,16 @@
  */
 
 CREATE TABLE IF NOT EXISTS schema_version(
-    Lock char(1) NOT NULL DEFAULT 'X',  -- Makes sure this table only has one row.
-    version INTEGER NOT NULL,
-    upgraded BOOL NOT NULL,  -- Whether we reached this version from an upgrade or an initial schema.
-    CONSTRAINT schema_version_lock_x CHECK (Lock='X')
-    CONSTRAINT schema_version_lock_uniq UNIQUE (Lock)
+    `Lock` CHAR(1) NOT NULL DEFAULT 'X' UNIQUE,  -- Makes sure this table only has one row.
+    `version` INTEGER NOT NULL,
+    `upgraded` BOOL NOT NULL,  -- Whether we reached this version from an upgrade or an initial schema.
+    CHECK (`Lock`='X')
 );
 
 CREATE TABLE IF NOT EXISTS applied_schema_deltas(
-    version INTEGER NOT NULL,
-    file TEXT NOT NULL,
-    CONSTRAINT schema_deltas_ver_file UNIQUE (version, file) ON CONFLICT IGNORE
+    `version` INTEGER NOT NULL,
+    `file` VARCHAR(255) NOT NULL,
+    UNIQUE(version, file)
 );
 
 CREATE INDEX IF NOT EXISTS schema_deltas_ver ON applied_schema_deltas(version);
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 58dbf2802b..721f2862c4 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -17,6 +17,8 @@ from ._base import SQLBaseStore
 
 from twisted.internet import defer
 
+from synapse.util.stringutils import random_string
+
 import logging
 
 logger = logging.getLogger(__name__)
@@ -91,14 +93,15 @@ class StateStore(SQLBaseStore):
 
         state_group = context.state_group
         if not state_group:
+            group = _make_group_id(self._clock)
             state_group = self._simple_insert_txn(
                 txn,
                 table="state_groups",
                 values={
+                    "id": group,
                     "room_id": event.room_id,
                     "event_id": event.event_id,
                 },
-                or_ignore=True,
             )
 
             for state in state_events.values():
@@ -112,7 +115,6 @@ class StateStore(SQLBaseStore):
                         "state_key": state.state_key,
                         "event_id": state.event_id,
                     },
-                    or_ignore=True,
                 )
 
         self._simple_insert_txn(
@@ -154,3 +156,8 @@ class StateStore(SQLBaseStore):
 
         events = yield self._parse_events(results)
         defer.returnValue(events)
+
+
+def _make_group_id(clock):
+    return str(int(clock.time_msec())) + random_string(5)
+
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 66f307e640..3a310cd003 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -110,7 +110,7 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")):
         if self.topological is None:
             return "(%d < %s)" % (self.stream, "stream_ordering")
         else:
-            return "(%d < %s OR (%d == %s AND %d < %s))" % (
+            return "(%d < %s OR (%d = %s AND %d < %s))" % (
                 self.topological, "topological_ordering",
                 self.topological, "topological_ordering",
                 self.stream, "stream_ordering",
@@ -120,7 +120,7 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")):
         if self.topological is None:
             return "(%d >= %s)" % (self.stream, "stream_ordering")
         else:
-            return "(%d > %s OR (%d == %s AND %d >= %s))" % (
+            return "(%d > %s OR (%d = %s AND %d >= %s))" % (
                 self.topological, "topological_ordering",
                 self.topological, "topological_ordering",
                 self.stream, "stream_ordering",
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index b777395e06..7d22392444 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -125,8 +125,8 @@ class TransactionStore(SQLBaseStore):
                 SentTransactions.select_statement("destination = ?"),
             )
 
-        results = txn.execute(query, (destination,))
-        results = SentTransactions.decode_results(results)
+        txn.execute(query, (destination,))
+        results = SentTransactions.decode_results(txn.fetchall())
 
         prev_txns = [r.transaction_id for r in results]
 
@@ -134,7 +134,7 @@ class TransactionStore(SQLBaseStore):
 
         query = SentTransactions.insert_statement()
         txn.execute(query, SentTransactions.EntryType(
-            None,
+            self.get_next_stream_id(),
             transaction_id=transaction_id,
             destination=destination,
             ts=origin_server_ts,
@@ -270,7 +270,7 @@ class TransactionStore(SQLBaseStore):
                                        retry_last_ts, retry_interval):
 
         query = (
-            "INSERT OR REPLACE INTO %s "
+            "REPLACE INTO %s "
             "(destination, retry_last_ts, retry_interval) "
             "VALUES (?, ?, ?) "
         ) % DestinationsTable.table_name