summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rwxr-xr-xsynapse/app/homeserver.py20
-rw-r--r--synapse/storage/__init__.py43
-rw-r--r--synapse/storage/_base.py30
-rw-r--r--synapse/storage/engines/__init__.py35
-rw-r--r--synapse/storage/engines/maria.py30
-rw-r--r--synapse/storage/engines/sqlite3.py25
-rw-r--r--synapse/storage/keys.py4
-rw-r--r--synapse/storage/registration.py8
-rw-r--r--synapse/storage/schema/delta/12/v12.sql8
-rw-r--r--synapse/storage/schema/delta/13/v13.sql4
-rw-r--r--synapse/storage/schema/delta/14/v14.sql2
-rw-r--r--synapse/storage/schema/full_schemas/11/event_edges.sql14
-rw-r--r--synapse/storage/schema/full_schemas/11/event_signatures.sql8
-rw-r--r--synapse/storage/schema/full_schemas/11/im.sql18
-rw-r--r--synapse/storage/schema/full_schemas/11/keys.sql4
-rw-r--r--synapse/storage/schema/full_schemas/11/media_repository.sql8
-rw-r--r--synapse/storage/schema/full_schemas/11/presence.sql6
-rw-r--r--synapse/storage/schema/full_schemas/11/profiles.sql4
-rw-r--r--synapse/storage/schema/full_schemas/11/redactions.sql2
-rw-r--r--synapse/storage/schema/full_schemas/11/room_aliases.sql8
-rw-r--r--synapse/storage/schema/full_schemas/11/state.sql6
-rw-r--r--synapse/storage/schema/full_schemas/11/transactions.sql8
-rw-r--r--synapse/storage/schema/full_schemas/11/users.sql10
-rw-r--r--synapse/storage/signatures.py8
-rw-r--r--synapse/storage/stream.py6
-rw-r--r--synapse/util/retryutils.py2
26 files changed, 213 insertions, 108 deletions
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index beab6ffc7a..b185b2f569 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -20,6 +20,7 @@ sys.dont_write_bytecode = True
 from synapse.storage import (
     prepare_database, prepare_sqlite3_database, UpgradeDatabaseException,
 )
+from synapse.storage.engines import create_engine
 
 from synapse.server import HomeServer
 
@@ -376,7 +377,7 @@ def setup(config_options):
     if name in ["MySQLdb", "mysql.connector"]:
         db_config.setdefault("args", {}).update({
             "sql_mode": "TRADITIONAL",
-            "charset": "utf8",
+            "charset": "utf8mb4",
             "use_unicode": True,
         })
     elif name == "sqlite3":
@@ -388,6 +389,8 @@ def setup(config_options):
     else:
         raise RuntimeError("Unsupported database type '%s'" % (name,))
 
+    database_engine = create_engine(name)
+
     hs = SynapseHomeServer(
         config.server_name,
         domain_with_port=domain_with_port,
@@ -398,6 +401,7 @@ def setup(config_options):
         config=config,
         content_addr=config.content_addr,
         version_string=version_string,
+        database_engine=database_engine,
     )
 
     hs.create_resource_tree(
@@ -409,12 +413,14 @@ def setup(config_options):
     logger.info("Preparing database: %s...", db_name)
 
     try:
-        # with sqlite3.connect(db_name) as db_conn:
-        #     prepare_sqlite3_database(db_conn)
-        #     prepare_database(db_conn)
-        import mysql.connector
-        db_conn = mysql.connector.connect(**db_config.get("args", {}))
-        prepare_database(db_conn)
+        db_conn = database_engine.module.connect(**db_config.get("args", {}))
+
+        if name == "sqlite3":
+            prepare_sqlite3_database(db_conn)
+
+        prepare_database(db_conn, database_engine)
+
+        db_conn.commit()
     except UpgradeDatabaseException:
         sys.stderr.write(
             "\nFailed to upgrade database.\n"
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index abde7d0df5..f8053484cf 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -77,9 +77,6 @@ class DataStore(RoomMemberStore, RoomStore,
         self.min_token_deferred = self._get_min_token()
         self.min_token = None
 
-        self._next_stream_id_lock = threading.Lock()
-        self._next_stream_id = int(hs.get_clock().time_msec()) * 1000
-
     def insert_client_ip(self, user, access_token, device_id, ip, user_agent):
         return self._simple_upsert(
             "user_ips",
@@ -127,19 +124,21 @@ class UpgradeDatabaseException(PrepareDatabaseException):
     pass
 
 
-def prepare_database(db_conn):
+def prepare_database(db_conn, database_engine):
     """Prepares a database for usage. Will either create all necessary tables
     or upgrade from an older schema version.
     """
     try:
         cur = db_conn.cursor()
-        version_info = _get_or_create_schema_state(cur)
+        version_info = _get_or_create_schema_state(cur, database_engine)
 
         if version_info:
             user_version, delta_files, upgraded = version_info
-            _upgrade_existing_database(cur, user_version, delta_files, upgraded)
+            _upgrade_existing_database(
+                cur, user_version, delta_files, upgraded, database_engine
+            )
         else:
-            _setup_new_database(cur)
+            _setup_new_database(cur, database_engine)
 
         # cur.execute("PRAGMA user_version = %d" % (SCHEMA_VERSION,))
 
@@ -150,7 +149,7 @@ def prepare_database(db_conn):
         raise
 
 
-def _setup_new_database(cur):
+def _setup_new_database(cur, database_engine):
     """Sets up the database by finding a base set of "full schemas" and then
     applying any necessary deltas.
 
@@ -210,7 +209,7 @@ def _setup_new_database(cur):
         executescript(cur, sql_loc)
 
     cur.execute(
-        _convert_param_style(
+        database_engine.convert_param_style(
             "REPLACE INTO schema_version (version, upgraded)"
             " VALUES (?,?)"
         ),
@@ -221,12 +220,13 @@ def _setup_new_database(cur):
         cur,
         current_version=max_current_ver,
         applied_delta_files=[],
-        upgraded=False
+        upgraded=False,
+        database_engine=database_engine,
     )
 
 
 def _upgrade_existing_database(cur, current_version, applied_delta_files,
-                               upgraded):
+                               upgraded, database_engine):
     """Upgrades an existing database.
 
     Delta files can either be SQL stored in *.sql files, or python modules
@@ -335,26 +335,22 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files,
 
             # Mark as done.
             cur.execute(
-                _convert_param_style(
+                database_engine.convert_param_style(
                     "INSERT INTO applied_schema_deltas (version, file)"
-                    " VALUES (?,?)"
+                    " VALUES (?,?)",
                 ),
                 (v, relative_path)
             )
 
             cur.execute(
-                _convert_param_style(
+                database_engine.convert_param_style(
                     "REPLACE INTO schema_version (version, upgraded)"
-                    " VALUES (?,?)"
+                    " VALUES (?,?)",
                 ),
                 (v, True)
             )
 
 
-def _convert_param_style(sql):
-    return sql.replace("?", "%s")
-
-
 def get_statements(f):
     statement_buffer = ""
     in_comment = False  # If we're in a /* ... */ style comment
@@ -409,7 +405,7 @@ def executescript(txn, schema_path):
             txn.execute(statement)
 
 
-def _get_or_create_schema_state(txn):
+def _get_or_create_schema_state(txn, database_engine):
     try:
         # Bluntly try creating the schema_version tables.
         schema_path = os.path.join(
@@ -426,7 +422,7 @@ def _get_or_create_schema_state(txn):
 
     if current_version:
         txn.execute(
-            _convert_param_style(
+            database_engine.convert_param_style(
                 "SELECT file FROM applied_schema_deltas WHERE version >= ?"
             ),
             (current_version,)
@@ -446,6 +442,8 @@ def prepare_sqlite3_database(db_conn):
     new. This only affects sqlite databases since they were the only ones
     supported at the time.
     """
+    import sqlite3
+
     with db_conn:
         schema_path = os.path.join(
             dir_path, "schema", "schema_version.sql",
@@ -466,7 +464,8 @@ def prepare_sqlite3_database(db_conn):
                 db_conn.execute(
                     _convert_param_style(
                         "REPLACE INTO schema_version (version, upgraded)"
-                        " VALUES (?,?)"
+                        " VALUES (?,?)",
+                        sqlite3
                     ),
                     (row[0], False)
                 )
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 76ec3ee93f..047d100f46 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -29,6 +29,7 @@ import functools
 import simplejson as json
 import sys
 import time
+import threading
 
 
 logger = logging.getLogger(__name__)
@@ -118,19 +119,16 @@ def cached(max_entries=1000, num_args=1):
     return wrap
 
 
-def _convert_param_style(sql):
-    return sql.replace("?", "%s")
-
-
 class LoggingTransaction(object):
     """An object that almost-transparently proxies for the 'txn' object
     passed to the constructor. Adds logging and metrics to the .execute()
     method."""
-    __slots__ = ["txn", "name"]
+    __slots__ = ["txn", "name", "database_engine"]
 
-    def __init__(self, txn, name):
+    def __init__(self, txn, name, database_engine):
         object.__setattr__(self, "txn", txn)
         object.__setattr__(self, "name", name)
+        object.__setattr__(self, "database_engine", database_engine)
 
     def __getattr__(self, name):
         return getattr(self.txn, name)
@@ -142,7 +140,7 @@ class LoggingTransaction(object):
         # TODO(paul): Maybe use 'info' and 'debug' for values?
         sql_logger.debug("[SQL] {%s} %s", self.name, sql)
 
-        sql = _convert_param_style(sql)
+        sql = self.database_engine.convert_param_style(sql)
 
         try:
             if args and args[0]:
@@ -227,9 +225,14 @@ class SQLBaseStore(object):
 
         self._get_event_cache = LruCache(hs.config.event_cache_size)
 
+        self.database_engine = hs.database_engine
+
         # Pretend the getEventCache is just another named cache
         caches_by_name["*getEvent*"] = self._get_event_cache
 
+        self._next_stream_id_lock = threading.Lock()
+        self._next_stream_id = int(hs.get_clock().time_msec()) * 1000
+
     def start_profiling(self):
         self._previous_loop_ts = self._clock.time_msec()
 
@@ -281,7 +284,10 @@ class SQLBaseStore(object):
                 sql_scheduling_timer.inc_by(time.time() * 1000 - start_time)
                 transaction_logger.debug("[TXN START] {%s}", name)
                 try:
-                    return func(LoggingTransaction(txn, name), *args, **kwargs)
+                    return func(
+                        LoggingTransaction(txn, name, self.database_engine),
+                        *args, **kwargs
+                    )
                 except:
                     logger.exception("[TXN FAIL] {%s}", name)
                     raise
@@ -588,7 +594,7 @@ class SQLBaseStore(object):
         select_sql = "SELECT %s FROM %s WHERE %s" % (
             ", ".join(retcols),
             table,
-            " AND ".join("%s = ?" % (k) for k in keyvalues)
+            " AND ".join("%s = ?" % (k,) for k in keyvalues)
         )
 
         txn.execute(select_sql, keyvalues.values())
@@ -836,6 +842,12 @@ class SQLBaseStore(object):
         result = txn.fetchone()
         return result[0] if result else None
 
+    def get_next_stream_id(self):
+        with self._next_stream_id_lock:
+            i = self._next_stream_id
+            self._next_stream_id += 1
+            return i
+
 
 class _RollbackButIsFineException(Exception):
     """ This exception is used to rollback a transaction without implying
diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py
new file mode 100644
index 0000000000..709b6f88ac
--- /dev/null
+++ b/synapse/storage/engines/__init__.py
@@ -0,0 +1,35 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from .maria import MariaEngine
+from .sqlite3 import Sqlite3Engine
+
+
+SUPPORTED_MODULE = {
+    "sqlite3": Sqlite3Engine,
+    "mysql.connector": MariaEngine,
+}
+
+
+def create_engine(name):
+    engine_class = SUPPORTED_MODULE.get(name, None)
+
+    if engine_class:
+        module = __import__(name)
+        return engine_class(module)
+
+    raise RuntimeError(
+        "Unsupported database engine '%s'" % (name,)
+    )
diff --git a/synapse/storage/engines/maria.py b/synapse/storage/engines/maria.py
new file mode 100644
index 0000000000..df47763647
--- /dev/null
+++ b/synapse/storage/engines/maria.py
@@ -0,0 +1,30 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import types
+
+
+class MariaEngine(object):
+    def __init__(self, database_module):
+        self.module = database_module
+
+    def convert_param_style(self, sql):
+        return sql.replace("?", "%s")
+
+    def encode_parameter(self, param):
+        if isinstance(param, types.BufferType):
+            return str(param)
+        return param
diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite3.py
new file mode 100644
index 0000000000..639cdea41d
--- /dev/null
+++ b/synapse/storage/engines/sqlite3.py
@@ -0,0 +1,25 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class Sqlite3Engine(object):
+    def __init__(self, database_module):
+        self.module = database_module
+
+    def convert_param_style(self, sql):
+        return sql
+
+    def encode_parameter(self, param):
+        return param
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index e6975a945b..25fef79434 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -64,7 +64,7 @@ class KeyStore(SQLBaseStore):
                 "fingerprint": fingerprint,
                 "from_server": from_server,
                 "ts_added_ms": time_now_ms,
-                "tls_certificate": tls_certificate_bytes,
+                "tls_certificate": buffer(tls_certificate_bytes),
             },
         )
 
@@ -113,6 +113,6 @@ class KeyStore(SQLBaseStore):
                 "key_id": "%s:%s" % (verify_key.alg, verify_key.version),
                 "from_server": from_server,
                 "ts_added_ms": time_now_ms,
-                "verify_key": verify_key.encode(),
+                "verify_key": buffer(verify_key.encode()),
             },
         )
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 7258f7b2a5..0c785ec989 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -42,6 +42,7 @@ class RegistrationStore(SQLBaseStore):
         yield self._simple_insert(
             "access_tokens",
             {
+                "id": self.get_next_stream_id(),
                 "user_id": user_id,
                 "token": token
             },
@@ -78,8 +79,11 @@ class RegistrationStore(SQLBaseStore):
 
         # it's possible for this to get a conflict, but only for a single user
         # since tokens are namespaced based on their user ID
-        txn.execute("INSERT INTO access_tokens(user_id, token) " +
-                    "VALUES (?,?)", [user_id, token])
+        txn.execute(
+            "INSERT INTO access_tokens(id, user_id, token)"
+            " VALUES (?,?,?)",
+            (self.get_next_stream_id(), user_id, token,)
+        )
 
     @defer.inlineCallbacks
     def get_user_by_id(self, user_id):
diff --git a/synapse/storage/schema/delta/12/v12.sql b/synapse/storage/schema/delta/12/v12.sql
index b526109e6e..90ac474859 100644
--- a/synapse/storage/schema/delta/12/v12.sql
+++ b/synapse/storage/schema/delta/12/v12.sql
@@ -18,7 +18,7 @@ CREATE TABLE IF NOT EXISTS rejections(
     reason VARCHAR(255) NOT NULL,
     last_check VARCHAR(255) NOT NULL,
     UNIQUE (event_id)
-) ENGINE = INNODB;
+) ;
 
 -- Push notification endpoints that users have configured
 CREATE TABLE IF NOT EXISTS pushers (
@@ -37,7 +37,7 @@ CREATE TABLE IF NOT EXISTS pushers (
   last_success BIGINT,
   failing_since BIGINT,
   UNIQUE (app_id, pushkey)
-) ENGINE = INNODB;
+) ;
 
 CREATE TABLE IF NOT EXISTS push_rules (
   id BIGINT PRIMARY KEY,
@@ -48,7 +48,7 @@ CREATE TABLE IF NOT EXISTS push_rules (
   conditions VARCHAR(255) NOT NULL,
   actions VARCHAR(255) NOT NULL,
   UNIQUE(user_name, rule_id)
-) ENGINE = INNODB;
+) ;
 
 CREATE INDEX IF NOT EXISTS push_rules_user_name on push_rules (user_name);
 
@@ -56,7 +56,7 @@ CREATE TABLE IF NOT EXISTS user_filters(
   user_id VARCHAR(255),
   filter_id BIGINT,
   filter_json BLOB
-) ENGINE = INNODB;
+) ;
 
 CREATE INDEX IF NOT EXISTS user_filters_by_user_id_filter_id ON user_filters(
     user_id, filter_id
diff --git a/synapse/storage/schema/delta/13/v13.sql b/synapse/storage/schema/delta/13/v13.sql
index f0a5daf445..4953b6323c 100644
--- a/synapse/storage/schema/delta/13/v13.sql
+++ b/synapse/storage/schema/delta/13/v13.sql
@@ -20,7 +20,7 @@ CREATE TABLE IF NOT EXISTS application_services(
     hs_token VARCHAR(255),
     sender VARCHAR(255),
     UNIQUE(token)
-) ENGINE = INNODB;
+) ;
 
 CREATE TABLE IF NOT EXISTS application_services_regex(
     id BIGINT PRIMARY KEY,
@@ -28,4 +28,4 @@ CREATE TABLE IF NOT EXISTS application_services_regex(
     namespace INTEGER,  /* enum[room_id|room_alias|user_id] */
     regex VARCHAR(255),
     FOREIGN KEY(as_id) REFERENCES application_services(id)
-) ENGINE = INNODB;
+) ;
diff --git a/synapse/storage/schema/delta/14/v14.sql b/synapse/storage/schema/delta/14/v14.sql
index a1260c5c1f..3bda073c94 100644
--- a/synapse/storage/schema/delta/14/v14.sql
+++ b/synapse/storage/schema/delta/14/v14.sql
@@ -4,6 +4,6 @@ CREATE TABLE IF NOT EXISTS push_rules_enable (
   rule_id VARCHAR(255) NOT NULL,
   enabled TINYINT,
   UNIQUE(user_name, rule_id)
-) ENGINE = INNODB;
+) ;
 
 CREATE INDEX IF NOT EXISTS push_rules_enable_user_name on push_rules_enable (user_name);
diff --git a/synapse/storage/schema/full_schemas/11/event_edges.sql b/synapse/storage/schema/full_schemas/11/event_edges.sql
index 0f53488e92..336cd563df 100644
--- a/synapse/storage/schema/full_schemas/11/event_edges.sql
+++ b/synapse/storage/schema/full_schemas/11/event_edges.sql
@@ -17,7 +17,7 @@ CREATE TABLE IF NOT EXISTS event_forward_extremities(
     event_id VARCHAR(255) NOT NULL,
     room_id VARCHAR(255) NOT NULL,
     UNIQUE (event_id, room_id)
-) ENGINE = INNODB;
+) ;
 
 CREATE INDEX IF NOT EXISTS ev_extrem_room ON event_forward_extremities(room_id);
 CREATE INDEX IF NOT EXISTS ev_extrem_id ON event_forward_extremities(event_id);
@@ -27,7 +27,7 @@ CREATE TABLE IF NOT EXISTS event_backward_extremities(
     event_id VARCHAR(255) NOT NULL,
     room_id VARCHAR(255) NOT NULL,
     UNIQUE (event_id, room_id)
-) ENGINE = INNODB;
+) ;
 
 CREATE INDEX IF NOT EXISTS ev_b_extrem_room ON event_backward_extremities(room_id);
 CREATE INDEX IF NOT EXISTS ev_b_extrem_id ON event_backward_extremities(event_id);
@@ -39,7 +39,7 @@ CREATE TABLE IF NOT EXISTS event_edges(
     room_id VARCHAR(255) NOT NULL,
     is_state BOOL NOT NULL,
     UNIQUE (event_id, prev_event_id, room_id, is_state)
-) ENGINE = INNODB;
+) ;
 
 CREATE INDEX IF NOT EXISTS ev_edges_id ON event_edges(event_id);
 CREATE INDEX IF NOT EXISTS ev_edges_prev_id ON event_edges(prev_event_id);
@@ -49,7 +49,7 @@ CREATE TABLE IF NOT EXISTS room_depth(
     room_id VARCHAR(255) NOT NULL,
     min_depth INTEGER NOT NULL,
     UNIQUE (room_id)
-) ENGINE = INNODB;
+) ;
 
 CREATE INDEX IF NOT EXISTS room_depth_room ON room_depth(room_id);
 
@@ -59,7 +59,7 @@ create TABLE IF NOT EXISTS event_destinations(
     destination VARCHAR(255) NOT NULL,
     delivered_ts BIGINT DEFAULT 0, -- or 0 if not delivered
     UNIQUE (event_id, destination)
-) ENGINE = INNODB;
+) ;
 
 CREATE INDEX IF NOT EXISTS event_destinations_id ON event_destinations(event_id);
 
@@ -70,7 +70,7 @@ CREATE TABLE IF NOT EXISTS state_forward_extremities(
     type VARCHAR(255) NOT NULL,
     state_key VARCHAR(255) NOT NULL,
     UNIQUE (event_id, room_id)
-) ENGINE = INNODB;
+) ;
 
 CREATE INDEX IF NOT EXISTS st_extrem_keys ON state_forward_extremities(
     room_id, type, state_key
@@ -83,7 +83,7 @@ CREATE TABLE IF NOT EXISTS event_auth(
     auth_id VARCHAR(255) NOT NULL,
     room_id VARCHAR(255) NOT NULL,
     UNIQUE (event_id, auth_id, room_id)
-) ENGINE = INNODB;
+) ;
 
 CREATE INDEX IF NOT EXISTS evauth_edges_id ON event_auth(event_id);
 CREATE INDEX IF NOT EXISTS evauth_edges_auth_id ON event_auth(auth_id);
diff --git a/synapse/storage/schema/full_schemas/11/event_signatures.sql b/synapse/storage/schema/full_schemas/11/event_signatures.sql
index 334d7c8680..11e611598b 100644
--- a/synapse/storage/schema/full_schemas/11/event_signatures.sql
+++ b/synapse/storage/schema/full_schemas/11/event_signatures.sql
@@ -18,7 +18,7 @@ CREATE TABLE IF NOT EXISTS event_content_hashes (
     algorithm VARCHAR(255),
     hash BLOB,
     UNIQUE (event_id, algorithm)
-) ENGINE = INNODB;
+) ;
 
 CREATE INDEX IF NOT EXISTS event_content_hashes_id ON event_content_hashes(event_id);
 
@@ -28,7 +28,7 @@ CREATE TABLE IF NOT EXISTS event_reference_hashes (
     algorithm VARCHAR(255),
     hash BLOB,
     UNIQUE (event_id, algorithm)
-) ENGINE = INNODB;
+) ;
 
 CREATE INDEX IF NOT EXISTS event_reference_hashes_id ON event_reference_hashes(event_id);
 
@@ -39,7 +39,7 @@ CREATE TABLE IF NOT EXISTS event_signatures (
     key_id VARCHAR(255),
     signature BLOB,
     UNIQUE (event_id, signature_name, key_id)
-) ENGINE = INNODB;
+) ;
 
 CREATE INDEX IF NOT EXISTS event_signatures_id ON event_signatures(event_id);
 
@@ -50,6 +50,6 @@ CREATE TABLE IF NOT EXISTS event_edge_hashes(
     algorithm VARCHAR(255),
     hash BLOB,
     UNIQUE (event_id, prev_event_id, algorithm)
-) ENGINE = INNODB;
+) ;
 
 CREATE INDEX IF NOT EXISTS event_edge_hashes_id ON event_edge_hashes(event_id);
diff --git a/synapse/storage/schema/full_schemas/11/im.sql b/synapse/storage/schema/full_schemas/11/im.sql
index 9849e969be..a0fb337629 100644
--- a/synapse/storage/schema/full_schemas/11/im.sql
+++ b/synapse/storage/schema/full_schemas/11/im.sql
@@ -25,7 +25,7 @@ CREATE TABLE IF NOT EXISTS events(
     outlier BOOL NOT NULL,
     depth BIGINT DEFAULT 0 NOT NULL,
     UNIQUE (event_id)
-) ENGINE = INNODB;
+) ;
 
 CREATE INDEX IF NOT EXISTS events_stream_ordering ON events (stream_ordering);
 CREATE INDEX IF NOT EXISTS events_topological_ordering ON events (topological_ordering);
@@ -38,7 +38,7 @@ CREATE TABLE IF NOT EXISTS event_json(
     internal_metadata BLOB NOT NULL,
     json BLOB NOT NULL,
     UNIQUE (event_id)
-) ENGINE = INNODB;
+) ;
 
 CREATE INDEX IF NOT EXISTS event_json_room_id ON event_json(room_id);
 
@@ -50,7 +50,7 @@ CREATE TABLE IF NOT EXISTS state_events(
     state_key VARCHAR(255) NOT NULL,
     prev_state VARCHAR(255),
     UNIQUE (event_id)
-) ENGINE = INNODB;
+) ;
 
 CREATE INDEX IF NOT EXISTS state_events_room_id ON state_events (room_id);
 CREATE INDEX IF NOT EXISTS state_events_type ON state_events (type);
@@ -64,7 +64,7 @@ CREATE TABLE IF NOT EXISTS current_state_events(
     state_key VARCHAR(255) NOT NULL,
     UNIQUE (event_id),
     UNIQUE (room_id, type, state_key)
-) ENGINE = INNODB;
+) ;
 
 CREATE INDEX IF NOT EXISTS current_state_events_room_id ON current_state_events (room_id);
 CREATE INDEX IF NOT EXISTS current_state_events_type ON current_state_events (type);
@@ -77,7 +77,7 @@ CREATE TABLE IF NOT EXISTS room_memberships(
     room_id VARCHAR(255) NOT NULL,
     membership VARCHAR(255) NOT NULL,
     UNIQUE (event_id)
-) ENGINE = INNODB;
+) ;
 
 CREATE INDEX IF NOT EXISTS room_memberships_room_id ON room_memberships (room_id);
 CREATE INDEX IF NOT EXISTS room_memberships_user_id ON room_memberships (user_id);
@@ -89,14 +89,14 @@ CREATE TABLE IF NOT EXISTS feedback(
     sender VARCHAR(255),
     room_id VARCHAR(255),
     UNIQUE (event_id)
-) ENGINE = INNODB;
+) ;
 
 CREATE TABLE IF NOT EXISTS topics(
     event_id VARCHAR(255) NOT NULL,
     room_id VARCHAR(255) NOT NULL,
     topic VARCHAR(255) NOT NULL,
     UNIQUE (event_id)
-) ENGINE = INNODB;
+) ;
 
 CREATE INDEX IF NOT EXISTS topics_room_id ON topics(room_id);
 
@@ -113,12 +113,12 @@ CREATE TABLE IF NOT EXISTS rooms(
     room_id VARCHAR(255) PRIMARY KEY NOT NULL,
     is_public BOOL,
     creator VARCHAR(255)
-) ENGINE = INNODB;
+) ;
 
 CREATE TABLE IF NOT EXISTS room_hosts(
     room_id VARCHAR(255) NOT NULL,
     host VARCHAR(255) NOT NULL,
     UNIQUE (room_id, host)
-) ENGINE = INNODB;
+) ;
 
 CREATE INDEX IF NOT EXISTS room_hosts_room_id ON room_hosts (room_id);
diff --git a/synapse/storage/schema/full_schemas/11/keys.sql b/synapse/storage/schema/full_schemas/11/keys.sql
index c0f2ec29bb..a785cdb4c5 100644
--- a/synapse/storage/schema/full_schemas/11/keys.sql
+++ b/synapse/storage/schema/full_schemas/11/keys.sql
@@ -19,7 +19,7 @@ CREATE TABLE IF NOT EXISTS server_tls_certificates(
   ts_added_ms BIGINT, -- When the certifcate was added.
   tls_certificate BLOB, -- DER encoded x509 certificate.
   UNIQUE (server_name, fingerprint)
-) ENGINE = INNODB;
+) ;
 
 CREATE TABLE IF NOT EXISTS server_signature_keys(
   server_name VARCHAR(255), -- Server name.
@@ -28,4 +28,4 @@ CREATE TABLE IF NOT EXISTS server_signature_keys(
   ts_added_ms BIGINT, -- When the key was added.
   verify_key BLOB, -- NACL verification key.
   UNIQUE (server_name, key_id)
-) ENGINE = INNODB;
+) ;
diff --git a/synapse/storage/schema/full_schemas/11/media_repository.sql b/synapse/storage/schema/full_schemas/11/media_repository.sql
index d9559f5902..27fe297af6 100644
--- a/synapse/storage/schema/full_schemas/11/media_repository.sql
+++ b/synapse/storage/schema/full_schemas/11/media_repository.sql
@@ -21,7 +21,7 @@ CREATE TABLE IF NOT EXISTS local_media_repository (
     upload_name VARCHAR(255), -- The name the media was uploaded with.
     user_id VARCHAR(255), -- The user who uploaded the file.
     UNIQUE (media_id)
-) ENGINE = INNODB;
+) ;
 
 CREATE TABLE IF NOT EXISTS local_media_repository_thumbnails (
     media_id VARCHAR(255), -- The id used to refer to the media.
@@ -33,7 +33,7 @@ CREATE TABLE IF NOT EXISTS local_media_repository_thumbnails (
     UNIQUE (
         media_id, thumbnail_width, thumbnail_height, thumbnail_type
     )
-) ENGINE = INNODB;
+) ;
 
 CREATE INDEX IF NOT EXISTS local_media_repository_thumbnails_media_id
     ON local_media_repository_thumbnails (media_id);
@@ -47,7 +47,7 @@ CREATE TABLE IF NOT EXISTS remote_media_cache (
     media_length INTEGER, -- Length of the media in bytes.
     filesystem_id VARCHAR(255), -- The name used to store the media on disk.
     UNIQUE (media_origin, media_id)
-) ENGINE = INNODB;
+) ;
 
 CREATE TABLE IF NOT EXISTS remote_media_cache_thumbnails (
     media_origin VARCHAR(255), -- The remote HS the media came from.
@@ -62,7 +62,7 @@ CREATE TABLE IF NOT EXISTS remote_media_cache_thumbnails (
         media_origin, media_id, thumbnail_width, thumbnail_height,
         thumbnail_type
      )
-) ENGINE = INNODB;
+) ;
 
 CREATE INDEX IF NOT EXISTS remote_media_cache_thumbnails_media_id
     ON remote_media_cache_thumbnails (media_id);
diff --git a/synapse/storage/schema/full_schemas/11/presence.sql b/synapse/storage/schema/full_schemas/11/presence.sql
index 8031321083..b48b110ae9 100644
--- a/synapse/storage/schema/full_schemas/11/presence.sql
+++ b/synapse/storage/schema/full_schemas/11/presence.sql
@@ -18,7 +18,7 @@ CREATE TABLE IF NOT EXISTS presence(
   status_msg VARCHAR(255),
   mtime BIGINT, -- miliseconds since last state change
   UNIQUE(user_id)
-) ENGINE = INNODB;
+) ;
 
 -- For each of /my/ users which possibly-remote users are allowed to see their
 -- presence state
@@ -26,7 +26,7 @@ CREATE TABLE IF NOT EXISTS presence_allow_inbound(
   observed_user_id VARCHAR(255) NOT NULL,
   observer_user_id VARCHAR(255), -- a UserID,
   UNIQUE(observed_user_id)
-) ENGINE = INNODB;
+) ;
 
 -- For each of /my/ users (watcher), which possibly-remote users are they
 -- watching?
@@ -35,4 +35,4 @@ CREATE TABLE IF NOT EXISTS presence_list(
   observed_user_id VARCHAR(255), -- a UserID,
   accepted BOOLEAN,
   UNIQUE(user_id)
-) ENGINE = INNODB;
+) ;
diff --git a/synapse/storage/schema/full_schemas/11/profiles.sql b/synapse/storage/schema/full_schemas/11/profiles.sql
index 552645c56f..92da48f97e 100644
--- a/synapse/storage/schema/full_schemas/11/profiles.sql
+++ b/synapse/storage/schema/full_schemas/11/profiles.sql
@@ -14,7 +14,7 @@
  */
 CREATE TABLE IF NOT EXISTS profiles(
     user_id VARCHAR(255) NOT NULL,
-    displayname VARBINARY(255),
+    displayname VARCHAR(255),
     avatar_url VARCHAR(255),
     UNIQUE(user_id)
-) ENGINE = INNODB;
+) ;
diff --git a/synapse/storage/schema/full_schemas/11/redactions.sql b/synapse/storage/schema/full_schemas/11/redactions.sql
index ba93e860f6..9b52a2012a 100644
--- a/synapse/storage/schema/full_schemas/11/redactions.sql
+++ b/synapse/storage/schema/full_schemas/11/redactions.sql
@@ -16,7 +16,7 @@ CREATE TABLE IF NOT EXISTS redactions (
     event_id VARCHAR(255) NOT NULL,
     redacts VARCHAR(255) NOT NULL,
     UNIQUE (event_id)
-) ENGINE = INNODB;
+) ;
 
 CREATE INDEX IF NOT EXISTS redactions_event_id ON redactions (event_id);
 CREATE INDEX IF NOT EXISTS redactions_redacts ON redactions (redacts);
diff --git a/synapse/storage/schema/full_schemas/11/room_aliases.sql b/synapse/storage/schema/full_schemas/11/room_aliases.sql
index 1e706aac2b..220df87573 100644
--- a/synapse/storage/schema/full_schemas/11/room_aliases.sql
+++ b/synapse/storage/schema/full_schemas/11/room_aliases.sql
@@ -14,12 +14,12 @@
  */
 
 CREATE TABLE IF NOT EXISTS room_aliases(
-    room_alias VARCHAR(255) NOT NULL,
+    room_alias VARBINARY(255) NOT NULL,
     room_id VARCHAR(255) NOT NULL,
     UNIQUE (room_alias)
-) ENGINE = INNODB;
+) ;
 
 CREATE TABLE IF NOT EXISTS room_alias_servers(
-    room_alias VARCHAR(255) NOT NULL,
+    room_alias VARBINARY(255) NOT NULL,
     server VARCHAR(255) NOT NULL
-) ENGINE = INNODB;
+) ;
diff --git a/synapse/storage/schema/full_schemas/11/state.sql b/synapse/storage/schema/full_schemas/11/state.sql
index be9dc2920d..40584a325f 100644
--- a/synapse/storage/schema/full_schemas/11/state.sql
+++ b/synapse/storage/schema/full_schemas/11/state.sql
@@ -17,7 +17,7 @@ CREATE TABLE IF NOT EXISTS state_groups(
     id VARCHAR(20) PRIMARY KEY,
     room_id VARCHAR(255) NOT NULL,
     event_id VARCHAR(255) NOT NULL
-) ENGINE = INNODB;
+) ;
 
 CREATE TABLE IF NOT EXISTS state_groups_state(
     state_group VARCHAR(20) NOT NULL,
@@ -25,13 +25,13 @@ CREATE TABLE IF NOT EXISTS state_groups_state(
     type VARCHAR(255) NOT NULL,
     state_key VARCHAR(255) NOT NULL,
     event_id VARCHAR(255) NOT NULL
-) ENGINE = INNODB;
+) ;
 
 CREATE TABLE IF NOT EXISTS event_to_state_groups(
     event_id VARCHAR(255) NOT NULL,
     state_group VARCHAR(255) NOT NULL,
     UNIQUE (event_id)
-) ENGINE = INNODB;
+) ;
 
 CREATE INDEX IF NOT EXISTS state_groups_id ON state_groups(id);
 
diff --git a/synapse/storage/schema/full_schemas/11/transactions.sql b/synapse/storage/schema/full_schemas/11/transactions.sql
index bd13bba8c2..d33bdfb301 100644
--- a/synapse/storage/schema/full_schemas/11/transactions.sql
+++ b/synapse/storage/schema/full_schemas/11/transactions.sql
@@ -21,7 +21,7 @@ CREATE TABLE IF NOT EXISTS received_transactions(
     response_json BLOB,
     has_been_referenced BOOL default 0, -- Whether thishas been referenced by a prev_tx
     UNIQUE (transaction_id, origin)
-) ENGINE = INNODB;
+) ;
 
 CREATE INDEX IF NOT EXISTS transactions_have_ref ON received_transactions(origin, has_been_referenced);-- WHERE has_been_referenced = 0;
 
@@ -35,7 +35,7 @@ CREATE TABLE IF NOT EXISTS sent_transactions(
     response_code INTEGER DEFAULT 0,
     response_json BLOB,
     ts BIGINT
-) ENGINE = INNODB;
+) ;
 
 CREATE INDEX IF NOT EXISTS sent_transaction_dest ON sent_transactions(destination);
 CREATE INDEX IF NOT EXISTS sent_transaction_txn_id ON sent_transactions(transaction_id);
@@ -51,7 +51,7 @@ CREATE TABLE IF NOT EXISTS transaction_id_to_pdu(
     pdu_id VARCHAR(255),
     pdu_origin VARCHAR(255),
     UNIQUE (transaction_id, destination)
-) ENGINE = INNODB;
+) ;
 
 CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_dest ON transaction_id_to_pdu(destination);
 
@@ -60,4 +60,4 @@ CREATE TABLE IF NOT EXISTS destinations(
     destination VARCHAR(255) PRIMARY KEY,
     retry_last_ts BIGINT,
     retry_interval INTEGER
-) ENGINE = INNODB;
+) ;
diff --git a/synapse/storage/schema/full_schemas/11/users.sql b/synapse/storage/schema/full_schemas/11/users.sql
index 55bffb22f3..28909f5805 100644
--- a/synapse/storage/schema/full_schemas/11/users.sql
+++ b/synapse/storage/schema/full_schemas/11/users.sql
@@ -14,20 +14,20 @@
  */
 CREATE TABLE IF NOT EXISTS users(
     name VARCHAR(255),
-    password_hash VARBINARY(255),
+    password_hash VARCHAR(255),
     creation_ts BIGINT,
     admin BOOL DEFAULT 0 NOT NULL,
     UNIQUE(name)
-) ENGINE = INNODB;
+) ;
 
 CREATE TABLE IF NOT EXISTS access_tokens(
-    id INTEGER PRIMARY KEY AUTO_INCREMENT,
+    id BIGINT PRIMARY KEY,
     user_id VARCHAR(255) NOT NULL,
     device_id VARCHAR(255),
     token VARCHAR(255) NOT NULL,
     last_used BIGINT,
     UNIQUE(token)
-) ENGINE = INNODB;
+) ;
 
 CREATE TABLE IF NOT EXISTS user_ips (
     user VARCHAR(255) NOT NULL,
@@ -37,6 +37,6 @@ CREATE TABLE IF NOT EXISTS user_ips (
     user_agent VARCHAR(255) NOT NULL,
     last_seen BIGINT NOT NULL,
     UNIQUE (user, access_token, ip, user_agent)
-) ENGINE = INNODB;
+) ;
 
 CREATE INDEX IF NOT EXISTS user_ips_user ON user_ips(user);
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index 35bba854f9..f051828630 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -54,7 +54,7 @@ class SignatureStore(SQLBaseStore):
             {
                 "event_id": event_id,
                 "algorithm": algorithm,
-                "hash": hash_bytes,
+                "hash": buffer(hash_bytes),
             },
         )
 
@@ -116,7 +116,7 @@ class SignatureStore(SQLBaseStore):
             {
                 "event_id": event_id,
                 "algorithm": algorithm,
-                "hash": hash_bytes,
+                "hash": buffer(hash_bytes),
             },
         )
 
@@ -160,7 +160,7 @@ class SignatureStore(SQLBaseStore):
                 "event_id": event_id,
                 "signature_name": signature_name,
                 "key_id": key_id,
-                "signature": signature_bytes,
+                "signature": buffer(signature_bytes),
             },
         )
 
@@ -193,6 +193,6 @@ class SignatureStore(SQLBaseStore):
                 "event_id": event_id,
                 "prev_event_id": prev_event_id,
                 "algorithm": algorithm,
-                "hash": hash_bytes,
+                "hash": buffer(hash_bytes),
             },
         )
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 3a310cd003..e6bb5a8077 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -433,12 +433,6 @@ class StreamStore(SQLBaseStore):
 
         defer.returnValue(self.min_token)
 
-    def get_next_stream_id(self):
-        with self._next_stream_id_lock:
-            i = self._next_stream_id
-            self._next_stream_id += 1
-            return i
-
     def _get_room_events_max_id_txn(self, txn):
         txn.execute(
             "SELECT MAX(stream_ordering) as m FROM events"
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index 4e82232796..a42138f556 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -60,7 +60,7 @@ def get_retry_limiter(destination, clock, store, **kwargs):
 
     if retry_timings:
         retry_last_ts, retry_interval = (
-            retry_timings.retry_last_ts, retry_timings.retry_interval
+            retry_timings["retry_last_ts"], retry_timings["retry_interval"]
         )
 
         now = int(clock.time_msec())